如果 RDBMS 支持连接池,则每个新的 queryRunner 实例从连接池中获取一个连接。 对于不支持连接池的数据库,它会在整个数据源中使用同一个连接
queryRunner 提供单个数据库连接,可组织管理事务
使用
1 2 3 4 5 6 7 8
| const qr = dataSource.createQueryRunner();
await qr.connect();
const users = await queryRunner.manager.find(User);
await qr.release();
|
组织管理事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| await queryRunner.startTransaction();
try { await queryRunner.manager.save(user1); await queryRunner.manager.save(user2); await queryRunner.manager.save(photos);
await queryRunner.commitTransaction(); } catch (err) { await queryRunner.rollbackTransaction(); } finally { await queryRunner.release(); }
|
源码分析
创建 queryRunner 到建立连接的过程
typerom v0.3.17
mysql 2.18.1
typeorm
第一步 创建 queryRunner
1
| const qr = dataSource.createQueryRunner();
|
data-source/DataSource.ts
数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| constructor(options: DataSourceOptions) { this.name = options.name || "default" this.options = options this.logger = new LoggerFactory().create( this.options.logger, this.options.logging, ) this.driver = new DriverFactory().create(this) this.manager = this.createEntityManager() this.namingStrategy = options.namingStrategy || new DefaultNamingStrategy() this.metadataTableName = options.metadataTableName || "typeorm_metadata" this.queryResultCache = options.cache ? new QueryResultCacheFactory(this).create() : undefined this.relationLoader = new RelationLoader(this) this.relationIdLoader = new RelationIdLoader(this) this.isInitialized = false }
createQueryRunner(mode: ReplicationMode = "master"): QueryRunner { const queryRunner = this.driver.createQueryRunner(mode) const manager = this.createEntityManager(queryRunner) Object.assign(queryRunner, { manager: manager }) return queryRunner }
|
driver/mysql
MysqlDriver.ts
mysql 驱动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| constructor(connection: DataSource) { this.connection = connection this.options = { legacySpatialSupport: true, ...connection.options, } as MysqlConnectionOptions this.isReplicated = this.options.replication ? true : false
this.loadDependencies()
this.database = DriverUtils.buildDriverOptions( this.options.replication ? this.options.replication.master : this.options, ).database }
createQueryRunner(mode: ReplicationMode) { return new MysqlQueryRunner(this, mode) }
|
MysqlQueryRunner.ts
1 2 3 4 5 6 7 8
| constructor(driver: MysqlDriver, mode: ReplicationMode) { super() this.driver = driver this.connection = driver.connection this.broadcaster = new Broadcaster(this) this.mode = mode }
|
第二步 建立连接
driver/mysql/
MysqlQueryRunner.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| connect(): Promise<any> { if (this.databaseConnection) return Promise.resolve(this.databaseConnection)
if (this.databaseConnectionPromise) return this.databaseConnectionPromise
if (this.mode === "slave" && this.driver.isReplicated) { this.databaseConnectionPromise = this.driver .obtainSlaveConnection() .then((connection) => { this.databaseConnection = connection return this.databaseConnection }) } else { this.databaseConnectionPromise = this.driver .obtainMasterConnection() .then((connection) => { this.databaseConnection = connection return this.databaseConnection }) }
return this.databaseConnectionPromise }
|
MysqlDriver.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| obtainMasterConnection(): Promise<any> { return new Promise<any>((ok, fail) => { if (this.poolCluster) { this.poolCluster.getConnection( "MASTER", (err: any, dbConnection: any) => { err ? fail(err) : ok(this.prepareDbConnection(dbConnection)) }, ) } else if (this.pool) { this.pool.getConnection((err: any, dbConnection: any) => { err ? fail(err) : ok(this.prepareDbConnection(dbConnection)) }) } else { fail( new TypeORMError( `Connection is not established with mysql database`, ), ) } }) }
|
mysql
获取连接
lib/Pool.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| function Pool(options) { EventEmitter.call(this); this.config = options.config; this.config.connectionConfig.pool = this;
this._acquiringConnections = []; this._allConnections = []; this._freeConnections = []; this._connectionQueue = []; this._closed = false; }
Pool.prototype.getConnection = function (cb) { if (this._closed) { var err = new Error("Pool is closed."); err.code = "POOL_CLOSED"; process.nextTick(function () { cb(err); }); return; }
var connection; var pool = this;
if (this._freeConnections.length > 0) { connection = this._freeConnections.shift(); this.acquireConnection(connection, cb); return; }
if ( this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit ) { connection = new PoolConnection(this, { config: this.config.newConnectionConfig(), });
this._acquiringConnections.push(connection); this._allConnections.push(connection);
connection.connect( { timeout: this.config.acquireTimeout }, function onConnect(err) { spliceConnection(pool._acquiringConnections, connection);
if (pool._closed) { err = new Error("Pool is closed."); err.code = "POOL_CLOSED"; }
if (err) { pool._purgeConnection(connection); cb(err); return; }
pool.emit("connection", connection); pool.emit("acquire", connection); cb(null, connection); } ); return; }
if (!this.config.waitForConnections) { process.nextTick(function () { var err = new Error("No connections available."); err.code = "POOL_CONNLIMIT"; cb(err); }); return; }
this._enqueueCallback(cb); };
|
lib/PoolConnection.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| function PoolConnection(pool, options) { Connection.call(this, options); this._pool = pool;
if (Events.usingDomains) { this.domain = pool.domain; }
this.on("end", this._removeFromPool); this.on("error", function (err) { if (err.fatal) { this._removeFromPool(); } }); }
|
程序无响应问题
通过 queryRunner 手动管理事务,压测时遇到程序无响应问题,具体情况
- 程序可以收到请求,但是代码走到 typeorm 就无响应了
- 数据库可以正常响应其他客户端的查询请求
最小化代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| async update(data: UpdateConfigItemListDto) { const { category_id, item_list } = data;
const qr = this.dataSource.createQueryRunner()
try { await qr.connect() await qr.startTransaction()
const newItemList = await Promise.all(itemList.map(async item => { const { value, id, code } = item try { const dr = await this.handleHistoryNum(id, qr) dr && await qr.manager.update(ConfigItemValue, { id: dr.id, }, { deleted: true, }) await qr.manager.insert(ConfigItemValue, { config_item_id: id, value, category_id, created_by: payload?.user_id })
return { id, code, value } } catch (error) { this.logger.error(`update config item:${id} value:${value} error:`, error) throw error } }))
await qr.commitTransaction()
return newItemList } catch (error) { await qr.rollbackTransaction() throw new HttpErrorResponse(ApiErrorCode.MYSQL_ERROR, `更新配置异常`) } finally { await qr.release() } }
async handleHistoryNum(id: number, qr: QueryRunner) { const num = await qr.manager.getRepository(ConfigItemValue) .count({ where: { id, deleted: false } })
return row }
|
分析
可能的原因
- 数据库连接被占满
- 死锁
- 服务器或应用级别存在资源竞争
疑惑:即使连接被占满、死锁,随着时间拉长,死锁被回滚、事务执行完之后也应该释放连接,不应该一直无响应
mysql 日志
通过 SHOW ENGINE INNODB STATUS\G
看到死锁
获取连接池信息
1 2 3 4
| setInterval(() => { console.log("Pool Info ==>", this.dataSource.driver["pool"]); }, 1000);
|
waitForConnections
当连接数为 0 时,连接池的行为
true 默认
将新的连接请求放入队列中
false
立即返回异常
acquireTimeout
获取连接的超时时间,单位毫秒,默认 10000
connectionLimit
最大连接个数
代码
高并发下,数据库连接很快被占满,当连接被占满时
- 因为配置 mysql 的参数 waitForConnections 默认为 true,所以新的连接请求会被放入队列中等待
handleHistoryNum
方法中获取 num 是通过 this.dataSource
获取连接的,起初是想让查询请求放在事务外执行,但是方法中开启事务之后,所有的操作都放在事务中执行了,导致事务中还需要获取一个连接,而此时连接被占满,该连接请求被放入队列中,进而导致当前事务无法提交,连接无法释放,发生死锁
结论
- 事务中又获取连接,导致死循环,连接无法释放,程序一直卡死
- 开启事务之后,所有数据库操作都通过 qr.manager 执行
- 及时释放连接