你笑了

你的笑,是星星跳跃浪花的笑

0%

queryRunner管理事务

queryRunner

如果 RDBMS 支持连接池,则每个新的 queryRunner 实例从连接池中获取一个连接。 对于不支持连接池的数据库,它会在整个数据源中使用同一个连接
queryRunner 提供单个数据库连接,可组织管理事务

使用

1
2
3
4
5
6
7
8
// 创建实例
const qr = dataSource.createQueryRunner();
// 从连接池中获取连接
await qr.connect();
// 通过 manager 执行数据库操作
const users = await queryRunner.manager.find(User);
// 释放连接,释放后,qr 不能再继续执行数据库操作
await qr.release();

组织管理事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 开启事务
await queryRunner.startTransaction();

try {
// 一系列操作
await queryRunner.manager.save(user1);
await queryRunner.manager.save(user2);
await queryRunner.manager.save(photos);

// 提交事务
await queryRunner.commitTransaction();
} catch (err) {
// 回滚事务
await queryRunner.rollbackTransaction();
} finally {
// 释放连接
await queryRunner.release();
}

源码分析

创建 queryRunner 到建立连接的过程

typerom v0.3.17

mysql 2.18.1

typeorm

第一步 创建 queryRunner

1
const qr = dataSource.createQueryRunner();
data-source/DataSource.ts

数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 自己实例化或通过 nestjs 注入获取 dataSource 实例
constructor(options: DataSourceOptions) {
this.name = options.name || "default"
this.options = options
this.logger = new LoggerFactory().create(
this.options.logger,
this.options.logging,
)
this.driver = new DriverFactory().create(this) // 实例化时创建 driver,传入 dataSource 实例
this.manager = this.createEntityManager()
this.namingStrategy =
options.namingStrategy || new DefaultNamingStrategy()
this.metadataTableName = options.metadataTableName || "typeorm_metadata"
this.queryResultCache = options.cache
? new QueryResultCacheFactory(this).create()
: undefined
this.relationLoader = new RelationLoader(this)
this.relationIdLoader = new RelationIdLoader(this)
this.isInitialized = false
}

createQueryRunner(mode: ReplicationMode = "master"): QueryRunner {
const queryRunner = this.driver.createQueryRunner(mode) // 通过 driver 获取 queryRunner
const manager = this.createEntityManager(queryRunner)
Object.assign(queryRunner, { manager: manager })
return queryRunner
}
driver/mysql
MysqlDriver.ts

mysql 驱动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
constructor(connection: DataSource) { // 数据源实例即连接
this.connection = connection
this.options = {
legacySpatialSupport: true,
...connection.options, // 将传递给数据源的配置传给 mysql driver
} as MysqlConnectionOptions
this.isReplicated = this.options.replication ? true : false

this.loadDependencies()

this.database = DriverUtils.buildDriverOptions(
this.options.replication
? this.options.replication.master
: this.options,
).database
}

createQueryRunner(mode: ReplicationMode) {
return new MysqlQueryRunner(this, mode) // 实例化 queryRunner
}
MysqlQueryRunner.ts
1
2
3
4
5
6
7
8
// 最终获取的 queryRunner
constructor(driver: MysqlDriver, mode: ReplicationMode) {
super()
this.driver = driver
this.connection = driver.connection
this.broadcaster = new Broadcaster(this)
this.mode = mode
}

第二步 建立连接

1
await qr.connect();
driver/mysql/
MysqlQueryRunner.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
connect(): Promise<any> {
if (this.databaseConnection)
return Promise.resolve(this.databaseConnection)

if (this.databaseConnectionPromise)
return this.databaseConnectionPromise

if (this.mode === "slave" && this.driver.isReplicated) {
this.databaseConnectionPromise = this.driver
.obtainSlaveConnection()
.then((connection) => {
this.databaseConnection = connection
return this.databaseConnection
})
} else {
// master
this.databaseConnectionPromise = this.driver
.obtainMasterConnection()
.then((connection) => {
this.databaseConnection = connection
return this.databaseConnection
})
}

return this.databaseConnectionPromise
}
MysqlDriver.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
obtainMasterConnection(): Promise<any> {
return new Promise<any>((ok, fail) => {
if (this.poolCluster) {
this.poolCluster.getConnection(
"MASTER",
(err: any, dbConnection: any) => {
err
? fail(err)
: ok(this.prepareDbConnection(dbConnection))
},
)
} else if (this.pool) {
this.pool.getConnection((err: any, dbConnection: any) => { // 从 mysql/Pool.ts 获取连接
err ? fail(err) : ok(this.prepareDbConnection(dbConnection))
})
} else {
fail(
new TypeORMError(
`Connection is not established with mysql database`,
),
)
}
})
}

mysql

获取连接

lib/Pool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 连接池对象
function Pool(options) {
EventEmitter.call(this);
this.config = options.config;
this.config.connectionConfig.pool = this;

this._acquiringConnections = [];
this._allConnections = []; // 所有连接
this._freeConnections = []; // 空闲连接
this._connectionQueue = []; // 连接请求队列
this._closed = false;
}
// 获取连接
Pool.prototype.getConnection = function (cb) {
if (this._closed) {
var err = new Error("Pool is closed.");
err.code = "POOL_CLOSED";
process.nextTick(function () {
cb(err);
});
return;
}

var connection;
var pool = this;

if (this._freeConnections.length > 0) {
connection = this._freeConnections.shift();
this.acquireConnection(connection, cb);
return;
}

if (
this.config.connectionLimit === 0 ||
this._allConnections.length < this.config.connectionLimit
) {
connection = new PoolConnection(this, {
config: this.config.newConnectionConfig(),
}); // 这就是最终的连接

this._acquiringConnections.push(connection);
this._allConnections.push(connection);

connection.connect(
{ timeout: this.config.acquireTimeout },
function onConnect(err) {
spliceConnection(pool._acquiringConnections, connection);

if (pool._closed) {
err = new Error("Pool is closed.");
err.code = "POOL_CLOSED";
}

if (err) {
pool._purgeConnection(connection);
cb(err);
return;
}

pool.emit("connection", connection);
pool.emit("acquire", connection);
cb(null, connection);
}
);
return;
}

if (!this.config.waitForConnections) {
// 不等待直接返回异常
process.nextTick(function () {
var err = new Error("No connections available.");
err.code = "POOL_CONNLIMIT";
cb(err);
});
return;
}

this._enqueueCallback(cb); // 等待,则加入队列
};
lib/PoolConnection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 连接对象
function PoolConnection(pool, options) {
Connection.call(this, options);
this._pool = pool;

if (Events.usingDomains) {
this.domain = pool.domain;
}

this.on("end", this._removeFromPool);
this.on("error", function (err) {
if (err.fatal) {
this._removeFromPool();
}
});
}

程序无响应问题

通过 queryRunner 手动管理事务,压测时遇到程序无响应问题,具体情况

  1. 程序可以收到请求,但是代码走到 typeorm 就无响应了
  2. 数据库可以正常响应其他客户端的查询请求

最小化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async update(data: UpdateConfigItemListDto) {
const { category_id, item_list } = data;

const qr = this.dataSource.createQueryRunner()

try {
await qr.connect()
await qr.startTransaction() // 开启事务

const newItemList = await Promise.all(itemList.map(async item => {
const { value, id, code } = item
try {
// 处理历史记录
const dr = await this.handleHistoryNum(id, qr)
// 删除最早的历史记录
dr && await qr.manager.update(ConfigItemValue, {
id: dr.id,
}, {
deleted: true,
})
// 插入当前记录
await qr.manager.insert(ConfigItemValue, {
config_item_id: id,
value,
category_id,
created_by: payload?.user_id
})

return { id, code, value }
} catch (error) {
this.logger.error(`update config item:${id} value:${value} error:`, error)
throw error
}
}))

await qr.commitTransaction()

return newItemList
} catch (error) {
await qr.rollbackTransaction()
throw new HttpErrorResponse(ApiErrorCode.MYSQL_ERROR, `更新配置异常`)
} finally {
await qr.release()
}
}

async handleHistoryNum(id: number, qr: QueryRunner) {
const num = await qr.manager.getRepository(ConfigItemValue)
// bug
// const num = await this.dataSource.getRepository(ConfigItemValue)
.count({
where: {
id,
deleted: false
}
})
// 其他逻辑

// 返回最早的历史记录
return row
}

分析

可能的原因

  1. 数据库连接被占满
  2. 死锁
  3. 服务器或应用级别存在资源竞争

疑惑:即使连接被占满、死锁,随着时间拉长,死锁被回滚、事务执行完之后也应该释放连接,不应该一直无响应

mysql 日志

通过 SHOW ENGINE INNODB STATUS\G 看到死锁

获取连接池信息

1
2
3
4
// 怀疑是连接占满了,添加打印连接池信息的日志
setInterval(() => {
console.log("Pool Info ==>", this.dataSource.driver["pool"]);
}, 1000);

mysql 连接池配置

  • waitForConnections

    当连接数为 0 时,连接池的行为

    • true 默认

      将新的连接请求放入队列中

    • false

      立即返回异常

  • acquireTimeout

    获取连接的超时时间,单位毫秒,默认 10000

  • connectionLimit

    最大连接个数

代码

高并发下,数据库连接很快被占满,当连接被占满时

  1. 因为配置 mysql 的参数 waitForConnections 默认为 true,所以新的连接请求会被放入队列中等待
  2. handleHistoryNum 方法中获取 num 是通过 this.dataSource 获取连接的,起初是想让查询请求放在事务外执行,但是方法中开启事务之后,所有的操作都放在事务中执行了,导致事务中还需要获取一个连接,而此时连接被占满,该连接请求被放入队列中,进而导致当前事务无法提交,连接无法释放,发生死锁

结论

  1. 事务中又获取连接,导致死循环,连接无法释放,程序一直卡死
  2. 开启事务之后,所有数据库操作都通过 qr.manager 执行
  3. 及时释放连接