const mysql = require('mysql'); const fs = require('fs'); const path = require('path'); let globalConfig = JSON.parse(fs.readFileSync(path.join(__dirname, '../config.json'), 'utf8')); let pool = null; function create({ database, connectionLimit = 10 }) { let config = { connectionLimit: connectionLimit, //连接数量,默认是10 ...globalConfig[global.dbConfig || 'mysql'], database: database, }; // console.log(config); //创建数据库连接池 pool = mysql.createPool(config); // //从连接池中获取连接时,将触发该事件 // pool.on('acquire', function (conn) { // console.log('获取连接', conn.threadId); // }); //在连接池中建立新连接时,将触发该事件 pool.on('connection', function (conn) { console.log('建立新连接', conn.threadId); }); // //等待可用连接时,将触发该事件 // pool.on('enqueue', function () { // console.log('等待可用连接'); // }); // //当连接释放回池中时,触发该事件 // pool.on('release', function (conn) { // console.log('连接被释放回池中', conn.threadId); // }); } async function query(sql, params) { return await new Promise(function (resolve, reject) { if (!pool) { console.error('Database connection pool is not initialized yet.'); resolve(data); return; } // pool.query()方法可以自动的帮我们在连接池中获取可用连接 pool.query(sql, params, function (err, data) { if (err) reject(err); resolve(data); }); // // 当然我们也可以手动获取可用连接 // pool.getConnection(function (err, conn) { // if (err) reject(err); // conn.query(sql, function (err, data) { // if (err) reject(err); // resolve(data); // // 连接用完之后,需要释放,重新放回连接池中。 // // 注意这里并没有销毁该连接,该连接仍然可用,但需要重新获取 // conn.release(); // }); // }); }); } // sqlParamsEntities = { sql: "", params: [], callback: function(可选) } async function transaction(sqlParamsEntities) { let connection; try { connection = await new Promise((resolve, reject) => { pool.getConnection(function (err, connection) { if (err) { reject(err); } resolve(connection); }); }); } catch (err) { console.error("获取事务connection失败", err); return; } try { return await new Promise((resolve, reject) => { // 开启事务 connection.beginTransaction(function (err) { if (err) { reject(err); } // 开始执行SQL语句 console.log("开始执行transaction,共执行" + sqlParamsEntities.length + "条数据"); sqlParamsEntities.forEach((entity) => { connection.query(entity.sql, entity.param, function (tErr, data) { if (tErr) { reject(tErr); } if (typeof entity.callback === 'function') return entity.callback(data); }) }); // 执行完毕,提交事务 connection.commit(function (tErr, info) { console.log("transaction info: " + JSON.stringify(info)); if (tErr) { reject(tErr); } resolve(info); }) }); }); } catch (err) { console.error("事务执行失败,开始回滚"); connection.rollback(function () { console.log("transaction error: " + err); }); } finally { connection.release(); } } async function close() { await new Promise((resolve, reject) => { if (!pool) return; pool.end(function (err) { console.log('关闭连接池', err); resolve(err); }); }); } module.exports = { create, query, transaction, close, }