前提 最近一直在看Netty
相关的内容,也在编写一个轻量级的RPC
框架来练手,途中发现了Netty
的源码有很多亮点,某些实现甚至可以用苛刻 来形容。另外,Netty
提供的工具类也是相当优秀,可以开箱即用。这里分析一下个人比较喜欢的领域,并发方面的一个Netty
工具模块 - Promise
。
环境版本:
Promise简介 Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。
io.netty.util.concurrent.Promise
在注释中只有一句话:特殊的可写的 io.netty.util.concurrent.Future
(Promise
接口是io.netty.util.concurrent.Future
的子接口)。而io.netty.util.concurrent.Future
是java.util.concurrent.Future
的扩展,表示一个异步操作的结果 。我们知道,JDK
并发包中的Future
是不可写,也没有提供可监听的入口(没有应用观察者模式),而Promise
很好地弥补了这两个问题。另一方面从继承关系来看,DefaultPromise
是这些接口的最终实现类,所以分析源码的时候需要把重心放在DefaultPromise
类。一般一个模块提供的功能都由接口定义,这里分析一下两个接口的功能列表:
io.netty.util.concurrent.Promise
io.netty.util.concurrent.Future
先看io.netty.util.concurrent.Future
接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public interface Future <V > extends java .util .concurrent .Future <V > { boolean isSuccess () ; boolean isCancellable () ; Throwable cause () ; Future<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ; Future<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; Future<V> removeListener (GenericFutureListener<? extends Future<? super V>> listener) ; Future<V> removeListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; Future<V> sync () throws InterruptedException ; Future<V> syncUninterruptibly () ; Future<V> await () throws InterruptedException ; Future<V> awaitUninterruptibly () ; boolean await (long timeout, TimeUnit unit) throws InterruptedException ; boolean await (long timeoutMillis) throws InterruptedException ; boolean awaitUninterruptibly (long timeout, TimeUnit unit) ; boolean awaitUninterruptibly (long timeoutMillis) ; V getNow () ; @Override boolean cancel (boolean mayInterruptIfRunning) ; }
sync()
和await()
方法类似,只是sync()
会检查异常执行的情况,一旦发现执行异常马上把异常实例包装抛出,而await()
方法对异常无感知。
接着看io.netty.util.concurrent.Promise
接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public interface Promise <V > extends Future <V > { Promise<V> setSuccess (V result) ; boolean trySuccess (V result) ; Promise<V> setFailure (Throwable cause) ; boolean tryFailure (Throwable cause) ; boolean setUncancellable () ; @Override Promise<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ; @Override Promise<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; @Override Promise<V> removeListener (GenericFutureListener<? extends Future<? super V>> listener) ; @Override Promise<V> removeListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; @Override Promise<V> await () throws InterruptedException ; @Override Promise<V> awaitUninterruptibly () ; @Override Promise<V> sync () throws InterruptedException ; @Override Promise<V> syncUninterruptibly () ; }
到此,Promise
接口的所有功能都分析完毕,接下来从源码角度详细分析Promise
的实现。
Promise源码实现 Promise
的实现类为io.netty.util.concurrent.DefaultPromise
(其实DefaultPromise
还有很多子类,某些实现是为了定制特定的场景做了扩展),而DefaultPromise
继承自io.netty.util.concurrent.AbstractFuture
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public abstract class AbstractFuture <V > implements Future <V > { @Override public V get () throws InterruptedException, ExecutionException { await(); Throwable cause = cause(); if (cause == null ) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @Override public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) { Throwable cause = cause(); if (cause == null ) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } throw new TimeoutException(); } }
AbstractFuture
仅仅对get()
和get(long timeout, TimeUnit unit)
两个方法进行了实现,其实这两处的实现和java.util.concurrent.FutureTask
中的实现方式十分相似。
DefaultPromise
的源码比较多,这里分开多个部分去阅读,先看它的属性和构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution" ); private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8 , SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth" , 8 )); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result" ); private static final Object SUCCESS = new Object(); private static final Object UNCANCELLABLE = new Object(); private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( new CancellationException(), DefaultPromise.class, "cancel(...)" )); private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace(); private volatile Object result; private final EventExecutor executor; private Object listeners; private short waiters; private boolean notifyingListeners; public DefaultPromise (EventExecutor executor) { this .executor = checkNotNull(executor, "executor" ); } protected DefaultPromise () { executor = null ; } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this .cause = cause; } } private static final class LeanCancellationException extends CancellationException { private static final long serialVersionUID = 2794674970981187807L ; @Override public Throwable fillInStackTrace () { setStackTrace(CANCELLATION_STACK); return this ; } @Override public String toString () { return CancellationException.class.getName(); } } }
Promise
目前支持两种类型的监听器:
GenericFutureListener
:支持泛型的Future
监听器。GenericProgressiveFutureListener
:它是GenericFutureListener
的子类,支持进度表示和支持泛型的Future
监听器(有些场景需要多个步骤实现,类似于进度条那样)。1 2 3 4 5 6 7 8 9 10 11 public interface GenericFutureListener <F extends Future <?>> extends EventListener { void operationComplete (F future) throws Exception ; } public interface GenericProgressiveFutureListener <F extends ProgressiveFuture <?>> extends GenericFutureListener <F > { void operationProgressed (F future, long progress, long total) throws Exception ; }
为了让Promise
支持多个监听器,Netty
添加了一个默认修饰符修饰的DefaultFutureListeners
类用于保存监听器实例数组:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 final class DefaultFutureListeners { private GenericFutureListener<? extends Future<?>>[] listeners; private int size; private int progressiveSize; @SuppressWarnings("unchecked") DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2 ]; listeners[0 ] = first; listeners[1 ] = second; size = 2 ; if (first instanceof GenericProgressiveFutureListener) { progressiveSize ++; } if (second instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void add (GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this .listeners; final int size = this .size; if (size == listeners.length) { this .listeners = listeners = Arrays.copyOf(listeners, size << 1 ); } listeners[size] = l; this .size = size + 1 ; if (l instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void remove (GenericFutureListener<? extends Future<?>> l) { final GenericFutureListener<? extends Future<?>>[] listeners = this .listeners; int size = this .size; for (int i = 0 ; i < size; i ++) { if (listeners[i] == l) { int listenersToMove = size - i - 1 ; if (listenersToMove > 0 ) { System.arraycopy(listeners, i + 1 , listeners, i, listenersToMove); } listeners[-- size] = null ; this .size = size; if (l instanceof GenericProgressiveFutureListener) { progressiveSize --; } return ; } } } public GenericFutureListener<? extends Future<?>>[] listeners() { return listeners; } public int size () { return size; } public int progressiveSize () { return progressiveSize; } }
接下来看DefaultPromise
的剩余方法实现,笔者觉得DefaultPromise
方法实现在代码顺序上是有一定的艺术的。先看几个判断Promise
执行状态的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public boolean setUncancellable () { if (RESULT_UPDATER.compareAndSet(this , null , UNCANCELLABLE)) { return true ; } Object result = this .result; return !isDone0(result) || !isCancelled0(result); } @Override public boolean isSuccess () { Object result = this .result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); } @Override public boolean isCancellable () { return result == null ; } @Override public Throwable cause () { return cause0(result); } private Throwable cause0 (Object result) { if (!(result instanceof CauseHolder)) { return null ; } if (result == CANCELLATION_CAUSE_HOLDER) { CancellationException ce = new LeanCancellationException(); if (RESULT_UPDATER.compareAndSet(this , CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) { return ce; } result = this .result; } return ((CauseHolder) result).cause; } private static boolean isCancelled0 (Object result) { return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } private static boolean isDone0 (Object result) { return result != null && result != UNCANCELLABLE; } @Override public boolean isCancelled () { return isCancelled0(result); } @Override public boolean isDone () { return isDone0(result); } }
接着看监听器的添加和移除方法(这其中也包含了通知监听器的逻辑):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public Promise<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener" ); synchronized (this ) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this ; } @Override public Promise<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners" ); synchronized (this ) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null ) { break ; } addListener0(listener); } } if (isDone()) { notifyListeners(); } return this ; } @Override public Promise<V> removeListener (final GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener" ); synchronized (this ) { removeListener0(listener); } return this ; } @Override public Promise<V> removeListeners (final GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners" ); synchronized (this ) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null ) { break ; } removeListener0(listener); } } return this ; } private void addListener0 (GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null ) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } } private void removeListener0 (GenericFutureListener<? extends Future<? super V>> listener) { if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { listeners = null ; } } private void notifyListeners () { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1 ); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return ; } } safeExecute(executor, new Runnable() { @Override public void run () { notifyListenersNow(); } }); } private static void safeExecute (EventExecutor executor, Runnable task) { try { executor.execute(task); } catch (Throwable t) { rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?" , t); } } private void notifyListenersNow () { Object listeners; synchronized (this ) { if (notifyingListeners || this .listeners == null ) { return ; } notifyingListeners = true ; listeners = this .listeners; this .listeners = null ; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this , (GenericFutureListener<?>) listeners); } synchronized (this ) { if (this .listeners == null ) { notifyingListeners = false ; return ; } listeners = this .listeners; this .listeners = null ; } } } private void notifyListeners0 (DefaultFutureListeners listeners) { GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0 ; i < size; i ++) { notifyListener0(this , a[i]); } } @SuppressWarnings({ "unchecked", "rawtypes" }) private static void notifyListener0 (Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()" , t); } } } }
然后看wait()
和sync()
方法体系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public Promise<V> await () throws InterruptedException { if (isDone()) { return this ; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this ) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this ; } @Override public Promise<V> awaitUninterruptibly () { if (isDone()) { return this ; } checkDeadLock(); boolean interrupted = false ; synchronized (this ) { while (!isDone()) { incWaiters(); try { wait(); } catch (InterruptedException e) { interrupted = true ; } finally { decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this ; } @Override public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true ); } @Override public boolean await (long timeoutMillis) throws InterruptedException { return await0(MILLISECONDS.toNanos(timeoutMillis), true ); } @Override public boolean awaitUninterruptibly (long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false ); } catch (InterruptedException e) { throw new InternalError(); } } @Override public boolean awaitUninterruptibly (long timeoutMillis) { try { return await0(MILLISECONDS.toNanos(timeoutMillis), false ); } catch (InterruptedException e) { throw new InternalError(); } } protected void checkDeadLock () { EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } } @Override public Promise<V> sync () throws InterruptedException { await(); rethrowIfFailed(); return this ; } @Override public Promise<V> syncUninterruptibly () { awaitUninterruptibly(); rethrowIfFailed(); return this ; } private void incWaiters () { if (waiters == Short.MAX_VALUE) { throw new IllegalStateException("too many waiters: " + this ); } ++waiters; } private void decWaiters () { --waiters; } private void rethrowIfFailed () { Throwable cause = cause(); if (cause == null ) { return ; } PlatformDependent.throwException(cause); } private boolean await0 (long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone()) { return true ; } if (timeoutNanos <= 0 ) { return isDone(); } if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); long startTime = System.nanoTime(); long waitTime = timeoutNanos; boolean interrupted = false ; try { for (;;) { synchronized (this ) { if (isDone()) { return true ; } incWaiters(); try { wait(waitTime / 1000000 , (int ) (waitTime % 1000000 )); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true ; } } finally { decWaiters(); } } if (isDone()) { return true ; } else { waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0 ) { return isDone(); } } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } }
最后是几个设置结果和获取结果的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public Promise<V> setSuccess (V result) { if (setSuccess0(result)) { return this ; } throw new IllegalStateException("complete already: " + this ); } @Override public boolean trySuccess (V result) { return setSuccess0(result); } @Override public Promise<V> setFailure (Throwable cause) { if (setFailure0(cause)) { return this ; } throw new IllegalStateException("complete already: " + this , cause); } @Override public boolean tryFailure (Throwable cause) { return setFailure0(cause); } @SuppressWarnings("unchecked") @Override public V getNow () { Object result = this .result; if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) { return null ; } return (V) result; } @SuppressWarnings("unchecked") @Override public V get () throws InterruptedException, ExecutionException { Object result = this .result; if (!isDone0(result)) { await(); result = this .result; } if (result == SUCCESS || result == UNCANCELLABLE) { return null ; } Throwable cause = cause0(result); if (cause == null ) { return (V) result; } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @SuppressWarnings("unchecked") @Override public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Object result = this .result; if (!isDone0(result)) { if (!await(timeout, unit)) { throw new TimeoutException(); } result = this .result; } if (result == SUCCESS || result == UNCANCELLABLE) { return null ; } Throwable cause = cause0(result); if (cause == null ) { return (V) result; } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @Override public boolean cancel (boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this , null , CANCELLATION_CAUSE_HOLDER)) { if (checkNotifyWaiters()) { notifyListeners(); } return true ; } return false ; } private boolean setSuccess0 (V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setFailure0 (Throwable cause) { return setValue0(new CauseHolder(checkNotNull(cause, "cause" ))); } private boolean setValue0 (Object objResult) { if (RESULT_UPDATER.compareAndSet(this , null , objResult) || RESULT_UPDATER.compareAndSet(this , UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true ; } return false ; } private synchronized boolean checkNotifyWaiters () { if (waiters > 0 ) { notifyAll(); } return listeners != null ; } }
Promise的基本使用 要使用Netty
的Promise
模块,并不需要引入Netty
的所有依赖,这里只需要引入netty-common
:
1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-common</artifactId > <version > 4.1.44.Final</version > </dependency >
EventExecutor
选取方面,Netty
已经准备了一个GlobalEventExecutor
用于全局事件处理,这里可以直接选用(当然也可以自行实现EventExecutor
或者用EventExecutor
的其他实现类):
1 2 EventExecutor executor = GlobalEventExecutor.INSTANCE; Promise<String> promise = new DefaultPromise<>(executor);
这里设计一个场景:异步下载一个链接的资源到磁盘上,下载完成之后需要异步通知下载完的磁盘文件路径,得到通知之后打印下载结果到控制台中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class PromiseMain { public static void main (String[] args) throws Exception { String url = "http://xxx.yyy.zzz" ; EventExecutor executor = GlobalEventExecutor.INSTANCE; Promise<DownloadResult> promise = new DefaultPromise<>(executor); promise.addListener(new DownloadResultListener()); Thread thread = new Thread(() -> { try { System.out.println("开始下载资源,url:" + url); long start = System.currentTimeMillis(); Thread.sleep(2000 ); String location = "C:\\xxx\\yyy\\z.md" ; long cost = System.currentTimeMillis() - start; System.out.println(String.format("下载资源成功,url:%s,保存到:%s,耗时:%d ms" , url, location, cost)); DownloadResult result = new DownloadResult(); result.setUrl(url); result.setFileDiskLocation(location); result.setCost(cost); promise.setSuccess(result); } catch (Exception ignore) { } }, "Download-Thread" ); thread.start(); Thread.sleep(Long.MAX_VALUE); } @Data private static class DownloadResult { private String url; private String fileDiskLocation; private long cost; } private static class DownloadResultListener implements GenericFutureListener <Future <DownloadResult >> { @Override public void operationComplete (Future<DownloadResult> future) throws Exception { if (future.isSuccess()) { DownloadResult downloadResult = future.getNow(); System.out.println(String.format("下载完成通知,url:%s,文件磁盘路径:%s,耗时:%d ms" , downloadResult.getUrl(), downloadResult.getFileDiskLocation(), downloadResult.getCost())); } } } }
执行后控制台输出:
1 2 3 开始下载资源,url:http://xxx.yyy.zzz 下载资源成功,url:http://xxx.yyy.zzz,保存到:C:\xxx\yyy\z.md,耗时:2000 ms 下载完成通知,url:http://xxx.yyy.zzz,文件磁盘路径:C:\xxx\yyy\z.md,耗时:2000 ms
Promise
适用的场景很多,除了异步通知的场景也能用于同步调用,它在设计上比JUC
的Future
灵活很多,基于Future
扩展出很多新的特性,有需要的可以单独引入此依赖直接使用。
Promise监听器栈深度的问题 有些时候,由于封装或者人为编码异常等原因,监听器的回调可能出现基于多个Promise
形成的链(参考Issue-5302 ,a promise listener chain
),这样子有可能出现递归调用深度过大而导致栈溢出,因此需要设置一个阈值,限制递归调用的最大栈深度,这个深度阈值暂且称为栈深度保护阈值,默认值是8,可以通过系统参数io.netty.defaultPromise.maxListenerStackDepth
覆盖设置。这里贴出前面提到过的代码块:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void notifyListeners () { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1 ); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return ; } } safeExecute(executor, new Runnable() { @Override public void run () { notifyListenersNow(); } }); }
如果我们想模拟一个例子触发监听器调用栈深度保护 ,那么只需要想办法在同一个EventLoop
类型的线程中递归调用notifyListeners()
方法即可。
最典型的例子就是在上一个Promise
监听器回调的方法里面触发下一个Promise
的监听器的setSuccess()
(简单理解就是套娃 ),画个图理解一下:
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class PromiseListenerMain { private static final AtomicInteger COUNTER = new AtomicInteger(0 ); public static void main (String[] args) throws Exception { EventExecutor executor = ImmediateEventExecutor.INSTANCE; Promise<String> root = new DefaultPromise<>(executor); Promise<String> p1 = new DefaultPromise<>(executor); Promise<String> p2 = new DefaultPromise<>(executor); Promise<String> p3 = new DefaultPromise<>(executor); Promise<String> p4 = new DefaultPromise<>(executor); Promise<String> p5 = new DefaultPromise<>(executor); Promise<String> p6 = new DefaultPromise<>(executor); Promise<String> p7 = new DefaultPromise<>(executor); Promise<String> p8 = new DefaultPromise<>(executor); Promise<String> p9 = new DefaultPromise<>(executor); Promise<String> p10 = new DefaultPromise<>(executor); p1.addListener(new Listener(p2)); p2.addListener(new Listener(p3)); p3.addListener(new Listener(p4)); p4.addListener(new Listener(p5)); p5.addListener(new Listener(p6)); p6.addListener(new Listener(p7)); p7.addListener(new Listener(p8)); p8.addListener(new Listener(p9)); p9.addListener(new Listener(p10)); root.addListener(new Listener(p1)); root.setSuccess("success" ); Thread.sleep(Long.MAX_VALUE); } private static class Listener implements GenericFutureListener <Future <String >> { private final String name; private final Promise<String> promise; public Listener (Promise<String> promise) { this .name = "listener-" + COUNTER.getAndIncrement(); this .promise = promise; } @Override public void operationComplete (Future<String> future) throws Exception { System.out.println(String.format("监听器[%s]回调成功..." , name)); if (null != promise) { promise.setSuccess("success" ); } } } }
因为有safeExecute()
兜底执行,上面的所有Promise
都会回调,这里可以采用IDEA
的高级断点功能,在步入断点的地方添加额外的日志,输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-9 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-0 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-1 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-2 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-3 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-4 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-5 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-6 ]回调成功... safeExecute(notifyListenersNow)执行---------- 监听器[listener-7 ]回调成功... safeExecute(notifyListenersNow)执行---------- 监听器[listener-8 ]回调成功...
这里笔者有点疑惑,如果调用栈深度大于8,超出的部分会包装为Runnable
实例提交到事件执行器执行,岂不是把递归栈溢出的隐患变成了内存溢出的隐患(因为异步任务也有可能积压,除非拒绝任务提交,那么具体要看EventExecutor
的实现了)?
小结 Netty
提供的Promise
工具的源码和使用方式都分析完了,设计理念和代码都是十分值得借鉴,同时能够开箱即用,可以在日常编码中直接引入,减少重复造轮子的劳动和风险。
(本文完 e-a-20200123 c-3-d)