assistant 不同时操作多张表,减少死锁概率
This commit is contained in:
parent
0b887580fc
commit
e638b09313
@ -13,62 +13,68 @@ function getDiffSet(a, b) {
|
||||
}
|
||||
|
||||
async function migrateIdsFromCheckToFetch(tableName, fieldName, insertSql = null) {
|
||||
// console.log(`更新待爬取列表: ${tableName}`);
|
||||
try {
|
||||
// console.log(`更新待爬取列表: ${tableName}`);
|
||||
|
||||
let stepLength = 5000;
|
||||
while (true) {
|
||||
// 从 check 表中分块查出待处理数据
|
||||
let idsResult = await dbUtils.query(`SELECT id FROM wait_check_${tableName} LIMIT ${stepLength}`, []);
|
||||
let ids = idsResult.map(row => row.id);
|
||||
// console.log("ids", ids);
|
||||
if (ids.length == 0) {
|
||||
// console.log(`${tableName} done.`);
|
||||
break;
|
||||
};
|
||||
let stepLength = 5000;
|
||||
while (true) {
|
||||
// 从 check 表中分块查出待处理数据
|
||||
let idsResult = await dbUtils.query(`SELECT id FROM wait_check_${tableName} LIMIT ${stepLength}`, []);
|
||||
let ids = idsResult.map(row => row.id);
|
||||
// console.log("ids", ids);
|
||||
if (ids.length == 0) {
|
||||
// console.log(`${tableName} done.`);
|
||||
break;
|
||||
};
|
||||
|
||||
// 查询出已处理的数据
|
||||
let skipIdsResult = await dbUtils.query(`SELECT ${fieldName} as id FROM ${tableName} WHERE ${fieldName} IN ?`, [[ids]]);
|
||||
let skipIds = skipIdsResult.map(row => row.id);
|
||||
// console.log("skipIds", skipIds);
|
||||
// 查询出已处理的数据
|
||||
let skipIdsResult = await dbUtils.query(`SELECT ${fieldName} as id FROM ${tableName} WHERE ${fieldName} IN ?`, [[ids]]);
|
||||
let skipIds = skipIdsResult.map(row => row.id);
|
||||
// console.log("skipIds", skipIds);
|
||||
|
||||
// 剩余要爬取的数据
|
||||
let finalIds = getDiffSet(ids, skipIds);
|
||||
// console.log("finalIds", finalIds);
|
||||
// 剩余要爬取的数据
|
||||
let finalIds = getDiffSet(ids, skipIds);
|
||||
// console.log("finalIds", finalIds);
|
||||
|
||||
// 插入待爬取列表
|
||||
if (finalIds.length > 0) {
|
||||
var result = await dbUtils.query(insertSql ? insertSql : `INSERT IGNORE INTO wait_fetch_${tableName} (id) VALUES ?`, [finalIds.map(id => [id])]);
|
||||
// console.log(result);
|
||||
// 插入待爬取列表
|
||||
if (finalIds.length > 0) {
|
||||
var result = await dbUtils.query(insertSql ? insertSql : `INSERT IGNORE INTO wait_fetch_${tableName} (id) VALUES ?`, [finalIds.map(id => [id])]);
|
||||
// console.log(result);
|
||||
}
|
||||
|
||||
// 从待检查表中删除
|
||||
if (ids.length > 0)
|
||||
await dbUtils.query(`DELETE FROM wait_check_${tableName} WHERE id IN ?`, [[ids]]);
|
||||
console.log(`table: ${tableName}\t| ${fill(ids[0], 10)} - ${fill(ids.slice(-1)[0], 10)} ${fill(`(${finalIds.length}/${ids.length})`, 10, ' ', true)}\t| affected: ${result?.affectedRows}`);
|
||||
}
|
||||
|
||||
|
||||
// 从待检查表中删除
|
||||
if (ids.length > 0)
|
||||
await dbUtils.query(`DELETE FROM wait_check_${tableName} WHERE id IN ?`, [[ids]]);
|
||||
console.log(`table: ${tableName}\t| ${fill(ids[0], 10)} - ${fill(ids.slice(-1)[0], 10)} ${fill(`(${finalIds.length}/${ids.length})`, 10, ' ', true)}\t| affected: ${result?.affectedRows}`);
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
async function getPromise(tableName, fieldName, insertSql) {
|
||||
try {
|
||||
return new Promise(async function (resolve) {
|
||||
await migrateIdsFromCheckToFetch(tableName, fieldName, insertSql);
|
||||
resolve();
|
||||
});
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
await sleepUtils.sleep(10 * 1000);
|
||||
}
|
||||
return new Promise(async function (resolve) {
|
||||
await migrateIdsFromCheckToFetch(tableName, fieldName, insertSql);
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
async function updateWaitTable() {
|
||||
console.log(`更新待爬取列表`);
|
||||
await Promise.all([
|
||||
getPromise("song", "song_id"),
|
||||
getPromise("lyric", "song_id"),
|
||||
getPromise("comment", "song_id", `INSERT IGNORE INTO comment_progress (song_id) VALUES ?`),
|
||||
getPromise("album", "album_id"),
|
||||
getPromise("artist", "artist_id")
|
||||
]);
|
||||
|
||||
// 不同时操作多张表,减少死锁概率
|
||||
await migrateIdsFromCheckToFetch("song", "song_id");
|
||||
await migrateIdsFromCheckToFetch("lyric", "song_id");
|
||||
await migrateIdsFromCheckToFetch("comment", "song_id", `INSERT IGNORE INTO comment_progress (song_id) VALUES ?`);
|
||||
await migrateIdsFromCheckToFetch("album", "album_id");
|
||||
await migrateIdsFromCheckToFetch("artist", "artist_id");
|
||||
|
||||
// await Promise.all([
|
||||
// getPromise("song", "song_id"),
|
||||
// getPromise("lyric", "song_id"),
|
||||
// getPromise("comment", "song_id", `INSERT IGNORE INTO comment_progress (song_id) VALUES ?`),
|
||||
// getPromise("album", "album_id"),
|
||||
// getPromise("artist", "artist_id")
|
||||
// ]);
|
||||
console.log("All done.\n");
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user