Slick 死锁分析

现象

slick3.1版本中,并发执行带事务的DBIOAction, 线程会卡住, 导致DB连接被占满, 产生死锁。

日志分析

  • 应用日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    2019-04-17 20:30:26.197 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: schema.create [create table "TEST" ("ID" INTEGER NOT NULL)]
    2019-04-17 20:30:26.462 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.464 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-1 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.466 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.467 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-2 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.468 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.468 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.471 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-2 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.471 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.474 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.475 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-1 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.475 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.476 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.479 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-1 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.480 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.485 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.488 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-1 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.488 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.492 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.495 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.495 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-1 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.499 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.502 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-2 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.502 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.506 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.509 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-1 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.509 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.515 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.517 test [DEBUG] from slick.backend.DatabaseComponent.action in h2mem1-2 - #2: SingleInsertAction [insert into "TEST" ("ID") values (?)]
    2019-04-17 20:30:26.518 test [DEBUG] from slick.jdbc.JdbcBackend.statement in h2mem1-2 - Preparing statement: insert into "TEST" ("ID") values (?)
    2019-04-17 20:30:26.528 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.534 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.537 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.541 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    2019-04-17 20:30:26.547 test [DEBUG] from slick.backend.DatabaseComponent.action in main - #1: StartTransaction
    waiting
    2019-04-17 20:30:26.644 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-7 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.644 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-1 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.644 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-5 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.644 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-3 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.648 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-7 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.648 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-5 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.649 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-3 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.651 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-1 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.652 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-7 - #3: result [select count(1) from "TEST"]
    2019-04-17 20:30:26.652 test [DEBUG] from slick.backend.DatabaseComponent.action in ForkJoinPool-1-worker-3 - #3: result [select count(1) from "TEST"]

    只有 StartTransaction,却没有 Commit

  • jstack 日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    "RMI TCP Connection(6)-192.168.0.100" #32 daemon prio=9 os_prio=31 tid=0x00007f8c01975800 nid=0x6b03 runnable [0x0000700004c86000]
    java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    - locked <merged>(a java.io.BufferedInputStream)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:550)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$256(TCPTransport.java:683)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$1/672224267.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Locked ownable synchronizers: - <merged>(a java.util.concurrent.ThreadPoolExecutor$Worker)

    "h2mem1-2" #19 daemon prio=5 os_prio=31 tid=0x00007f8bfe2f4000 nid=0x1407 waiting on condition [0x0000700002d27000]
    java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <merged>(a com.zaxxer.hikari.util.Java6ConcurrentBag$Synchronizer)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedLongSynchronizer.doAcquireSharedNanos(AbstractQueuedLongSynchronizer.java:815)
    at java.util.concurrent.locks.AbstractQueuedLongSynchronizer.tryAcquireSharedNanos(AbstractQueuedLongSynchronizer.java:1106)
    at com.zaxxer.hikari.util.ConcurrentBag.borrow(ConcurrentBag.java:134)
    at com.zaxxer.hikari.pool.BaseHikariPool.getConnection(BaseHikariPool.java:201)
    at com.zaxxer.hikari.pool.BaseHikariPool.getConnection(BaseHikariPool.java:182)
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:93)
    at slick.jdbc.hikaricp.HikariCPJdbcDataSource.createConnection(HikariCPJdbcDataSource.scala:12)
    at slick.jdbc.JdbcBackend$BaseSession.conn$lzycompute(JdbcBackend.scala:415)
    - locked <merged>(a slick.jdbc.JdbcBackend$BaseSession)
    at slick.jdbc.JdbcBackend$BaseSession.conn(JdbcBackend.scala:414)
    at slick.jdbc.JdbcBackend$BaseSession.startInTransaction(JdbcBackend.scala:437)
    at slick.driver.JdbcActionComponent$StartTransaction$.run(JdbcActionComponent.scala:41)
    at slick.driver.JdbcActionComponent$StartTransaction$.run(JdbcActionComponent.scala:38)
    at slick.backend.Dat abaseComponent$DatabaseDef$$anon$2.liftedTree1$1(DatabaseComponent.scala:237)
    at slick.backend.DatabaseComponent$DatabaseDef$$anon$2.run(DatabaseComponent.scala:237)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Locked ownable synchronizers: - <merged>(a java.util.concurrent.ThreadPoolExecutor$Worker)
    • java.io.BufferedInputStream

      • 线程阻塞在IO上,其他线程一直在获取数据库连接
    • com.zaxxer.hikari.pool.BaseHikariPool.getConnection(BaseHikariPool.java:201)

      • 猜测原因是部分线程开启事物但是没提交,占着数据库连接资源不放,导致后续的db操作尝试获取db连接,还一直都拿不到,就造成了阻塞现象

源码分析

我们看 action.transactionally 实际返回的是一个链式的 DBIOAction

1
2
3
4
SynchronousDatabaseAction.fuseUnsafe(
StartTransaction.andThen(a).cleanUp(eo => if(eo.isEmpty) Commit else Rollback)(DBIO.sameThreadExecutionContext)
.asInstanceOf[DBIOAction[R, S, E with Effect.Transactional]]
)

其中的 a 就是下面的 action db真实的操作语句

1
2
3
val action = {testTable += i}
.flatMap { _ => testTable.length.result }
.flatMap { _ => DBIO.successful(s"insert successfully $i") }

最后以StartTransaction为开头的以CleanUpAction为结尾DBIOAction链,走到递归方法
runInContext[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean): Future[R]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
protected[this] def runInContext[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean): Future[R] = {
logAction(a, ctx)
a match {
case SuccessAction(v) => Future.successful(v)
case FailureAction(t) => Future.failed(t)
case FutureAction(f) => f
case FlatMapAction(base, f, ec) =>
runInContext(base, ctx, false, topLevel).flatMap(v => runInContext(f(v), ctx, streaming, false))(ctx.getEC(ec))
case AndThenAction(actions) =>
val last = actions.length - 1
def run(pos: Int, v: Any): Future[Any] = {
val f1 = runInContext(actions(pos), ctx, streaming && pos == last, pos == 0)
if(pos == last) f1
else f1.flatMap(run(pos + 1, _))(DBIO.sameThreadExecutionContext)
}
run(0, null).asInstanceOf[Future[R]]
case sa @ SequenceAction(actions) =>
val len = actions.length
val results = new AtomicReferenceArray[Any](len)
def run(pos: Int): Future[Any] = {
if(pos == len) Future.successful {
val b = sa.cbf()
var i = 0
while(i < len) {
b += results.get(i)
i += 1
}
b.result()
}
else runInContext(actions(pos), ctx, false, pos == 0).flatMap { (v: Any) =>
results.set(pos, v)
run(pos + 1)
} (DBIO.sameThreadExecutionContext)
}
run(0).asInstanceOf[Future[R]]
case CleanUpAction(base, f, keepFailure, ec) =>
val p = Promise[R]()
runInContext(base, ctx, streaming, topLevel).onComplete { t1 =>
try {
val a2 = f(t1 match {
case Success(_) => None
case Failure(t) => Some(t)
})
runInContext(a2, ctx, false, false).onComplete { t2 =>
if(t2.isFailure && (t1.isSuccess || !keepFailure)) p.complete(t2.asInstanceOf[Failure[R]])
else p.complete(t1)
} (DBIO.sameThreadExecutionContext)
} catch {
case NonFatal(ex) =>
throw (t1 match {
case Failure(t) if keepFailure => t
case _ => ex
})
}
} (ctx.getEC(ec))
p.future
case FailedAction(a) =>
runInContext(a, ctx, false, topLevel).failed.asInstanceOf[Future[R]]
case AsTryAction(a) =>
val p = Promise[R]()
runInContext(a, ctx, false, topLevel).onComplete(v => p.success(v.asInstanceOf[R]))(DBIO.sameThreadExecutionContext)
p.future
case NamedAction(a, _) =>
runInContext(a, ctx, streaming, topLevel)
case a: SynchronousDatabaseAction[_, _, _, _] =>
if(streaming) {
if(a.supportsStreaming) streamSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect]], ctx.asInstanceOf[StreamingContext], !topLevel).asInstanceOf[Future[R]]
else runInContext(CleanUpAction(AndThenAction(Vector(DBIO.Pin, a.nonFusedEquivalentAction)), _ => DBIO.Unpin, true, DBIO.sameThreadExecutionContext), ctx, streaming, topLevel)
} else runSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[R, NoStream, This, _]], ctx, !topLevel)
case a: DatabaseAction[_, _, _] =>
throw new SlickException(s"Unsupported database action $a for $this")
}
}

CleanUpAction(base, f, keepFailure, ec) 中的 f 就是 eo => if(eo.isEmpty) Commit else Rollback
根据是否有异常来决定 Commit 或是 Rollback
runSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[R,NoStream,This,_]],ctx,!topLevel) 的逻辑是将任务提交到线程池中去执行
先申请连接,然后在执行自己内部的 run 方法,释放连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected[this] def runSynchronousDatabaseAction[R](a: SynchronousDatabaseAction[R, NoStream, This, _], ctx: Context, highPrio: Boolean): Future[R] = {
val promise = Promise[R]()
ctx.getEC(synchronousExecutionContext).prepare.execute(new AsyncExecutor.PrioritizedRunnable {
def highPriority = highPrio
def run: Unit =
try {
ctx.sync
val res = try {
acquireSession(ctx)
val res = try a.run(ctx) catch { case NonFatal(ex) =>
releaseSession(ctx, true)
throw ex
}
releaseSession(ctx, false)
res
} finally { ctx.sync = 0 }
promise.success(res)
} catch { case NonFatal(ex) => promise.tryFailure(ex) }
})
promise.future
}

结论

多个以StartTransaction为开头的以CleanUpAction为结尾的DBIOAction链提交到线程池里面去,线程池内都是与DB交互的IO阻塞性任务
具有事务提交逻辑的CleanUpAction 在 DBIOAction链的末尾,极有可能拿不到线程池中的优先执行权限,占着线程池内的资源的其他线程没有事务提交,不会释放掉DB连接。
造成数据库连接池连接资源紧张,其他线程就一直处在申请连接的阻塞状态

备注

  • slick 默认的数据库连接池选择的是 HikariCP
  • numThread的配置同样也是数据库连接池里面默认的 db 最小连接数的配置