Scala Future 异步工具类解读
简介
将任务提交到线程池,来达到异步执行到效果
流程解析
初始化
先看一个 Future 用法的 例子
1 | Future { println("start future") } |
在 scala 的语法里,以下三种写法作用一样
1 | Future {} |
进入到 Future.apply 方法
1 | def apply[T](body: =>T)(implicit ('execctx) executor: ExecutionContext): Future[T] = |
unit 为
1 | val unit: Future[Unit] = successful(()) |
再看 successful
1 | def successful[T](result: T): Future[T] = Promise.successful(result).future |
进入到 Promise.successful()
1 | def successful[T](result: T): Promise[T] = fromTry(Success(result)) |
进入到 Promise.KeptPromise,
1 | def apply[T](result: Try[T]): scala.concurrent.Promise[T] = |
最终构造了一个 Kept 对象 Kept 是 Promise 的子类,Promise 是 Future 的子类
1 | Kept[T] extends Promise[T] |
执行任务 body: => T
我们重回 Future.apply, 看看 unit.map(_ => body) 的逻辑
1 | def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f) |
transform是一个抽象方法,所以我们去看 子类 Promise.transform 的实现
1 | import scala.concurrent.Future |
根据 DefaultPromise 类的定义
1 | class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T] |
接着我们看 DefaultPromise.onComplete 的实现, DefaultPromise 是 AtomicReference 无锁的对象引用
的子类
1 | final def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = dispatchOrAddCallback(new CallbackRunnable[T](executor.prepare(), func)) |
最终 runable.executeWithValue 执行,也就是 CallbackRunnable.executeWithValue
提交任务到线程池去执行
1 | private final class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { |
看callbackRunnalbe的定义, 函数onComplete的实现为 Promise.transform 的实现中的代码
1 | result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) |
最后 p.complete 会返回一个 Promise 对象也就是 Future对象本身
1 | def complete(result: Try[T]): this.type = |
总结
Future 执行流程
- 创建Future 交给 Promise 对象管理,并将线程池引用传入到 Promise 对象中
- Promise 对 Future 里的任务进行调度执行