背景
分布式服务系统中,通过透传一个 traceId,就可以做到链路追踪
服务与服务之间调用链路可以很清晰的描述出服务与服务之间的依赖关系
但是 系统内部如何 把 这个 traceId 在线程之间传递,并封装在返回结果中?
实现调用链
如果是同步调用,一个线程处理所有请求,此时可以用跟 线程绑定的 ThreadLocal 来实现, 这个很好理解
如果是异步调用,一个事务处理逻辑中会有多个线程参与执行,我们得想办法 将 threadLocal 在多个线程之间进行传递
先看下 scala 中 如何实现 异步, Future 的详细实现 可参考 Scala Future 异步工具类解读
1 2 3 4 5
| import scala.concurrent.ExecutionContext.Implicits.global
Future { println("hh") }
|
概括来讲, Future的执行 实际上是 交给 ExecutionContext.execute 去做的
1 2 3 4 5
| private[scala] class ExecutionContextImpl private[impl] (val executor: java.util.concurrent.Executor, val reporter: Throwable => Unit) extends ExecutionContextExecutor { require(executor ne null, "Executor must not be null") override def execute(runnable: Runnable) = executor execute runnable override def reportFailure(t: Throwable) = reporter(t) }
|
executor execute runnable 就是把 Future 的任务 转变成 Runable 去交给底层的 线程池去执行
大部分的 Java 的业务系统都是扩展了 Java 底层线程池的逻辑
但是 Scala 已经 抽象出一层 ExecutionContext, 通过实现 ExecutionContext 的一个子类,来重写 execute 方法, 把 ExecutionContext.execute 之前的线程的 threadlocal 传给 执行 execute 的线程就 Ok了
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| case class TraceExecuteContext(executionContext: ExecutionContext) extends ExecutionContext { override def execute(runnable: Runnable): Unit = { val currentThread = ThreadContextUtil.getContext executionContext.execute(new Runnable { def run(): Unit = { currentThread.foreach(thread => ThreadContextUtil.copyContext(thread.copy(tid = Thread.currentThread().getId.toString))) runnable.run() } }) }
override def reportFailure(cause: Throwable): Unit = executionContext.reportFailure(cause)
}
|
扩展
Play Framework
playframework2.6 底层基于 Akka 实现,我们需要替换掉 akka 的 ExecutionContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class TracePropagatingDispatcher( config: MessageDispatcherConfigurator, id: String, throughput: Int, throughputDeadlineTime: Duration, executorServiceFactoryProvider: ExecutorServiceFactoryProvider, shutdownTimeout: FiniteDuration) extends Dispatcher(config, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout) { self =>
override def prepare(): ExecutionContext = TraceExecuteContext.traceExecuteContext
override def reportFailure(t: Throwable): Unit = self.reportFailure(t) }
class TracePropagatingDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { val instance = new TracePropagatingDispatcher( this, config.getString("id"), config.getInt("throughput"), config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS).nanosecond, configureExecutor(), config.getDuration("shutdown-timeout", TimeUnit.MICROSECONDS).millisecond )
override def dispatcher(): MessageDispatcher = instance
}
|
在 reference.conf 里面 配置 akka 的 default-dispatcher 为 TracePropagatingDispatcherConfigurator
1 2 3 4 5 6 7
| akka { actor { default-dispatcher = { type = "io.github.wtog.strace.akka.TracePropagatingDispatcherConfigurator" } } }
|
函数式编程 天然适合做 AOP,Play 的 Filter 基于此,可以拦截到所有请求,可以在 filter 里面 将 traceId 初始化,这样 就可以将外部的 traceid,传给内部
1 2 3 4 5 6 7 8 9 10
| class TraceFilter @Inject() (configuration: Configuration)(implicit override val mat: Materializer, ec: TraceExecuteContext) extends LazyLogging { def apply(f: RequestHeader => Future[Result])(rh: RequestHeader): Future[Result] = { val traceId = rh.headers.get("traceId") ThreadContextUtil(traceId) f(rh).map {r => ThreadContextUtil.cleanContext() r.withHeaders("traceId" -> traceId) } } }
|
代码地址 https://github.com/wtog/strace