1
0
Code Issues Pull Requests Projects Releases Wiki Activity GitHub Gitee
tools/utils/dbPoolUtils.js

140 lines
4.5 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
let globalConfig = JSON.parse(fs.readFileSync(path.join(__dirname, '../config.json'), 'utf8'));
let pool = null;
function create({ database, connectionLimit = 10 }) {
let config = {
connectionLimit: connectionLimit, //连接数量默认是10
...globalConfig[global.dbConfig || 'mysql'],
database: database,
};
// console.log(config);
//创建数据库连接池
pool = mysql.createPool(config);
// //从连接池中获取连接时,将触发该事件
// pool.on('acquire', function (conn) {
// console.log('获取连接', conn.threadId);
// });
//在连接池中建立新连接时,将触发该事件
pool.on('connection', function (conn) {
console.log('建立新连接', conn.threadId);
});
// //等待可用连接时,将触发该事件
// pool.on('enqueue', function () {
// console.log('等待可用连接');
// });
// //当连接释放回池中时,触发该事件
// pool.on('release', function (conn) {
// console.log('连接被释放回池中', conn.threadId);
// });
}
async function query(sql, params) {
return await new Promise(function (resolve, reject) {
if (!pool) {
console.error('Database connection pool is not initialized yet.');
resolve(data);
return;
}
// pool.query()方法可以自动的帮我们在连接池中获取可用连接
pool.query(sql, params, function (err, data) {
if (err) reject(err);
resolve(data);
});
// // 当然我们也可以手动获取可用连接
// pool.getConnection(function (err, conn) {
// if (err) reject(err);
// conn.query(sql, function (err, data) {
// if (err) reject(err);
// resolve(data);
// // 连接用完之后,需要释放,重新放回连接池中。
// // 注意这里并没有销毁该连接,该连接仍然可用,但需要重新获取
// conn.release();
// });
// });
});
}
// sqlParamsEntities = { sql: "", params: [], callback: function(可选) }
async function transaction(sqlParamsEntities) {
let connection;
try {
connection = await new Promise((resolve, reject) => {
pool.getConnection(function (err, connection) {
if (err) {
reject(err);
}
resolve(connection);
});
});
} catch (err) {
console.error("获取事务connection失败", err);
return;
}
try {
return await new Promise((resolve, reject) => {
// 开启事务
connection.beginTransaction(function (err) {
if (err) {
reject(err);
}
// 开始执行SQL语句
console.log("开始执行transaction共执行" + sqlParamsEntities.length + "条数据");
sqlParamsEntities.forEach((entity) => {
connection.query(entity.sql, entity.param, function (tErr, data) {
if (tErr) {
reject(tErr);
}
if (typeof entity.callback === 'function')
return entity.callback(data);
})
});
// 执行完毕,提交事务
connection.commit(function (tErr, info) {
console.log("transaction info: " + JSON.stringify(info));
if (tErr) {
reject(tErr);
}
resolve(info);
})
});
});
} catch (err) {
console.error("事务执行失败,开始回滚");
connection.rollback(function () {
console.log("transaction error: " + err);
});
} finally {
connection.release();
}
}
async function close() {
await new Promise((resolve, reject) => {
if (!pool) return;
pool.end(function (err) {
console.log('关闭连接池', err);
resolve(err);
});
});
}
module.exports = {
create,
query,
transaction,
close,
}