1
0
mirror of https://gitee.com/coder-xiaomo/gitee2github synced 2025-09-10 05:51:40 +08:00
Code Issues Projects Releases Wiki Activity GitHub Gitee

单一py文件拆分为py文件模块,功能完成

This commit is contained in:
2022-03-17 01:16:26 +08:00
parent 91268f3bba
commit 876936e036
12 changed files with 685 additions and 377 deletions

13
assets/common.py Normal file
View File

@@ -0,0 +1,13 @@
import os
import json
def saveJSON(data, filename):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def readJSON(filename):
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
def fileExists(filename):
return os.path.exists(filename)

View File

@@ -0,0 +1,20 @@
import signal
def defineWorkingDirCleanFunction(saveJSONFunction):
"""
定义Ctrl+C退出时 工作目录清理函数
"""
def cleanWhenExit(signum, frame):
print()
print('程序正在清理工作目录,准备退出...')
os.rmdir(WorkingDir)
print('完成!')
print()
for countdown in range(3, 0, -1):
print('\r' + "{} 秒后退出{}".format(countdown, '.' * (4 - countdown)), end='')
time.sleep(1)
exit()
# 注册信号处理函数
signal.signal(signal.SIGINT, cleanWhenExit)
signal.signal(signal.SIGTERM, cleanWhenExit)
print("[info] 工作目录清理函数准备完成")

94
assets/giteeOauth.py Normal file
View File

@@ -0,0 +1,94 @@
import webbrowser
import requests
import time
def giteeOauth(GiteeClientID, GiteeClientSecret, giteeProxies, timeout):
print("################################ Gitee 授权 ################################")
input("按回车开始进行Gitee账号授权")
# ######################################## 获取Gitee用户的 access_token ########################################
# Api文档 https://gitee.com/api/v5/oauth_doc#/
# 认证地址
oauth_url = 'https://gitee.com/oauth/authorize?client_id={ClientID}&redirect_uri={redirect_uri}&response_type=code&scope=user_info%20projects' \
.format(ClientID=GiteeClientID, redirect_uri='https://www.only4.work/appHelper/gitee_show_code_param.php')
# 打开浏览器让用户授权
webbrowser.open(oauth_url)
# 让用户手动输入授权码
print("请在新打开的页面进行授权,授权完成后,请将授权码拷贝到此处,然后按回车继续")
code = input("您的授权码:")
r = None # 定义变量
tryTime = 10
while(tryTime > 0):
tryTime = tryTime - 1
try:
print("正在获取Gitee access_token...")
r = requests.post('https://gitee.com/oauth/token', params = {
"grant_type": "authorization_code",
"code": code,
"client_id": GiteeClientID,
"redirect_uri": "https://www.only4.work/appHelper/gitee_show_code_param.php",
"client_secret": GiteeClientSecret,
}, proxies = giteeProxies, timeout = timeout)
break
except Exception as err:
print(err)
print("出错啦,正在重试,还剩{}次机会".format(tryTime))
time.sleep(1)
continue
print("[info] Gitee账号授权完成")
# print(r.text)
access_token = r.json()['access_token']
print("您的access_token为" + access_token)
# {"access_token":"83b5de53fc28079d80830983be587774","token_type":"bearer","expires_in":86400,"refresh_token":"6d445558db02909f8755383193bfb87648b897b6f92654e4f5d3e6ad61b0d515","scope":"user_info projects","created_at":1647013348}
# ######################################## 获得用户仓库信息 ########################################
# Api文档 https://gitee.com/api/v5/swagger#/getV5UserRepos
tryTime = 10
while(tryTime > 0):
tryTime = tryTime - 1
try:
r = requests.get('https://gitee.com/api/v5/user/repos', params = {
"access_token": access_token,
"visibility": "all",
"sort": "full_name",
"page": 1,
"per_page": 100,
}, proxies = giteeProxies, timeout = timeout)
break
except Exception as err:
print(err)
print("出错啦,正在重试,还剩{}次机会".format(tryTime))
time.sleep(1)
continue
# print(r.text)
print()
print("################################ 成功获取到您的仓库信息 ################################")
giteeRepos = {}
for repo in r.json():
print(repo['human_name'])
print(repo['html_url'])
print(repo['full_name'])
print()
# giteeRepos.append()
giteeRepos[repo['full_name']] = {
'human_name': repo['human_name'],
'html_url': repo['html_url'],
'ssh_url': repo['ssh_url'],
'full_name': repo['full_name'],
# 'path': repo['path'], # 仓库的路径 如 /only4/appHelper 的 appHelper
'name': repo['name'], # 仓库名称 如 仓库同步助手
# 'public': repo['public'], # 是否公开
'private': repo['private'], # 是否私有
# 'internal': repo['internal'], # 是否内部仓库
# 'empty_repo': repo['empty_repo'], # 是否为空仓库
'pushed_at': repo['pushed_at'], # 最后提交时间
}
print("[info] Gitee仓库读取完成")
return [r.json(), giteeRepos]

99
assets/githubOauth.py Normal file
View File

@@ -0,0 +1,99 @@
import webbrowser
import requests
import time
def githubOauth(GitHubClientID, GitHubClientSecret, githubProxies, timeout):
print("################################ GitHub 授权 ################################")
input("按回车开始进行GitHub账号授权")
# ######################################## 获取GitHub用户的 access_token ########################################
# Api文档
# https://docs.github.com/cn/developers/apps/building-oauth-apps/authorizing-oauth-apps
# https://docs.github.com/cn/developers/apps/building-oauth-apps/scopes-for-oauth-apps
# 认证地址
oauth_url = 'https://github.com/login/oauth/authorize?client_id={ClientID}&redirect_uri={redirect_uri}&response_type=code&scope=user,repo' \
.format(ClientID=GitHubClientID, redirect_uri='https://www.only4.work/appHelper/github_show_code_param.php')
# 打开浏览器让用户授权
webbrowser.open(oauth_url)
# 让用户手动输入授权码
print("请在新打开的页面进行授权,授权完成后,请将授权码拷贝到此处,然后按回车继续")
code = input("您的授权码:")
r = None # 定义变量
tryTime = 10
while(tryTime > 0):
tryTime = tryTime - 1
try:
r = requests.post('https://github.com/login/oauth/access_token', headers = {
'Accept': 'application/json'
}, params = {
"code": code,
"client_id": GitHubClientID,
"redirect_uri": "https://www.only4.work/appHelper/github_show_code_param.php",
"client_secret": GitHubClientSecret,
}, proxies = githubProxies, timeout = timeout)
break
except Exception as err:
print(err)
print("出错啦,正在重试,还剩{}次机会".format(tryTime))
time.sleep(1)
continue
print("[info] GitHub账号授权完成")
# print(r.text)
access_token = r.json()['access_token']
print("您的access_token为" + access_token)
# {"access_token":"ghu_NgIJEFtrQz4FtTqfewVaHlR9Xnb30R26oMwM","expires_in":28800,"refresh_token":"ghr_mu8iw6A33ae1AoIo3hMFVX7VssbPmGIlfSKyc2CTQIPootRSMnr48c3WVevQpYfwLL9MaQ0vWvTR","refresh_token_expires_in":15897600,"token_type":"bearer","scope":""}
# ######################################## 获得用户仓库信息 ########################################
tryTime = 10
while(tryTime > 0):
tryTime = tryTime - 1
try:
# Api文档 https://docs.github.com/cn/rest/reference/repos#list-repositories-for-the-authenticated-user
r = requests.get('https://api.github.com/user/repos', headers = {
'Accept': 'application/vnd.github.v3+json',
"Authorization": "token " + access_token,
# 👆 https://developer.github.com/changes/2020-02-10-deprecating-auth-through-query-param/
}, params = {
"access_token": access_token,
"visibility": "all",
"sort": "full_name",
"page": 1,
"per_page": 100,
}, proxies = githubProxies, timeout = timeout)
break
except Exception as err:
print(err)
print("出错啦,正在重试,还剩{}次机会".format(tryTime))
time.sleep(1)
continue
# print(r.text)
print()
print("################################ 成功获取到您的仓库信息 ################################")
githubRepos = {}
for repo in r.json():
print(repo['full_name'])
print(repo['html_url'])
# githubRepos.append()
githubRepos[repo['full_name']] = {
# 'human_name': repo['human_name'],
'html_url': repo['html_url'],
'ssh_url': repo['ssh_url'],
'full_name': repo['full_name'],
# 'path': repo['path'], # 仓库的路径 如 /only4/appHelper 的 appHelper
'name': repo['name'], # 仓库名称 如 仓库同步助手
# 'public': repo['public'], # 是否公开
'private': repo['private'], # 是否私有
# 'internal': repo['internal'], # 是否内部仓库
# 'empty_repo': repo['empty_repo'], # 是否为空仓库
'pushed_at': repo['pushed_at'], # 最后提交时间
}
print()
print("[info] GitHub仓库读取完成")
return [r.json(), githubRepos]

View File

@@ -0,0 +1,60 @@
import os;
import shutil;
def prepareWorkingDir(CurrentDir, WorkingDir):
"""
准备工作目录
"""
print("################################ 正在准备工作目录 ################################")
print()
print("当前目录", CurrentDir) # 获取当前文件所在目录
print("工作目录", WorkingDir)
# print(os.getcwd()) # 获取当前脚本运行目录
if os.path.exists(WorkingDir):
pass
# # 工作目录已存在,如果非空就创建
# if not os.listdir(WorkingDir):
# print("工作目录已存在且为空,无需创建")
# else:
# # print('工作目录已存在且非空,请检查是否有其他程序正在使用,如果确认无其他程序在使用,请手动删除工作目录,然后重试!')
# # print()
# # print("是否删除工作目录中的文件? (Y: 删除, N: 不删除)")
# # userInput = ''
# # while userInput == '':
# # userInput = input("\r>").strip().lower ()
# # if userInput in ['y', 'yes']:
# # # os.rmdir(WorkingDir) # 只能删除空文件夹
# # shutil.rmtree(WorkingDir) #递归删除文件夹
# # os.mkdir(WorkingDir)
# # print("成功清空工作目录", WorkingDir)
# # else:
# # input('按回车键退出...')
# # exit()
# print()
# print('工作目录已存在且非空,是否在上次同步的基础上继续?')
# while True:
# userInput = input("y: 继续, n: 清空工作目录 (y): ").strip().lower ()
# if userInput in ['', 'y', 'yes']:
# break
# elif userInput in ['n', 'no']:
# # os.rmdir(WorkingDir) # 只能删除空文件夹
# shutil.rmtree(WorkingDir) #递归删除文件夹
# os.mkdir(WorkingDir)
# print("成功清空工作目录", WorkingDir)
# break
# else:
# input('按回车键退出...')
# exit()
else:
# 工作目录不存在,创建该目录
os.mkdir(WorkingDir)
print("成功创建工作目录", WorkingDir)
print("[info] 工作目录准备完成")
# os.mkdir(WorkingDir)
# os.makedirs(WorkingDir)
# os.system("chdir")
# print(__file__)

13
assets/readConfigFile.py Normal file
View File

@@ -0,0 +1,13 @@
import configparser
def readConfigFile(configFilePath):
# 可以不考虑配置文件不存在的情况
# 实例化configParser对象
config = configparser.ConfigParser()
# -read读取ini文件
config.read(configFilePath, encoding='UTF-8') # GB18030
# -sections得到所有的section并以列表的形式返回
# configSections = config.sections()
return config
print("[info] 配置文件读取完成")

111
assets/repoMatching.py Normal file
View File

@@ -0,0 +1,111 @@
import copy
def repoMatching(repo1, repo2):
# 对字典进行深拷贝
repo1 = copy.deepcopy(repo1)
repo2 = copy.deepcopy(repo2)
# 配对字典
matchList = {
'match': [],
'mismatch': {
'repo1': [],
'repo2': [],
},
}
# 定义映射决策
from .common import readJSON, fileExists
if fileExists("mapping.json"):
mapping = readJSON("mapping.json")
orgNameMap = mapping.get('orgNameMap')
repoNameMap = mapping.get('repoNameMap')
fullNameMap = mapping.get('fullNameMap')
else:
mapping = None # 不需要映射
def mapRepoName(fullName):
if mapping == None:
return fullName
elif fullName in fullNameMap.keys():
return fullNameMap.get(fullName)
else:
[orgName, repoName] = fullName.split('/')
if orgName in orgNameMap.keys():
orgName = orgNameMap.get(orgName)
if repoName in repoNameMap.keys():
repoName = repoNameMap.get(repoName)
return orgName + "/" + repoName
# # 测试
# print(mapRepoName("only4/thisisarepo"))
# print(mapRepoName("only-4/this-is-a-repo"))
# print(mapRepoName("only4/this-is-a-repo"))
# print(mapRepoName("only-4/thisisarepo"))
# print(mapRepoName("only4/repo1"))
# exit()
# def judgeRepoEqual(name1, name2):
# if mapping == None: # 没有配置映射
# return name1 == name2
# else:
# for i in fullNameMap: # 比较 full_name 映射 比如 org1/repo1 -> org2/repo2
# if name1 == i[0]:
# if name2 == i[1]:
# return True # 如果 name1 和 name2 在 full_name 映射中,则匹配
# else:
# return False # 如果 name1 在 full_name 映射中,但是 name2 不在,则不匹配
# # 接下来同时比较 org repo 映射,将 org1 repo1进行映射如果 name2 与 name1+映射)相等,则匹配
# [org1, repo1] = name1.split('/')
# [org2, repo2] = name2.split('/')
# for i in orgNameMap: # 将 org1 进行映射 备注如果不存在映射则相当于始终没有进这个for循环中的if条件
# if org1 == i[0]:
# org1 = i[1]
# break
# if org1 != org2: # 如果 org1 和 org2 不相等,则一定不匹配
# return False
# # org1 与 org2 映射后相等,现在比较 repo1 和 repo2
# for i in repoNameMap: # 将 repo1 进行映射 备注如果不存在映射则相当于始终没有进这个for循环中的if条件
# if repo1 == i[0]:
# repo1 = i[1]
# break
# if org1 != org2: # 如果 org1 和 org2 不相等,则一定不匹配
# return False
# return True
# # 测试
# print(judgeRepoEqual("only4/thisisarepo", "only-4/this-is-a-repo")) # True
# print(judgeRepoEqual("only4/thisisarepo", "only4/this-is-a-repo")) # False
# print(judgeRepoEqual("only4/thisisarepo", "only-4/thisisarepo")) # True
# print(judgeRepoEqual("only4/thisisarepo", "only4/thisisarepo")) # False
# print(judgeRepoEqual("only4/repo1", "only5/repo2")) # True
# exit()
for repo1FullName in repo1.keys(): # 遍历repo1仓库名
# print(repo1FullName)
repo1FullName_map = mapRepoName(repo1FullName)
if(repo1FullName_map in repo2.keys()): # 如果在repo2中则说明匹配成功
matchList.get('match').append({
'from': repo1[repo1FullName],
'to': repo2[repo1FullName_map],
})
del repo2[repo1FullName_map]
else:
matchList.get('mismatch')['repo1'].append(repo1[repo1FullName])
matchList.get('mismatch')['repo2'] = list(repo2.values())
return matchList
def printMatchintInfo(matchList, repo1Name = "repo1", repo2Name = "repo2"):
print("################## 以下仓库不会同步 ################## ")
print("{repo1Name} 有但是 {repo2Name} 没有的仓库".format(repo1Name = repo1Name, repo2Name = repo2Name))
for i in matchList.get('mismatch').get('repo1'):
print(' ' + i['html_url'])
print()
print("{repo2Name} 有但是 {repo1Name} 没有的仓库".format(repo1Name = repo1Name, repo2Name = repo2Name))
for i in matchList.get('mismatch').get('repo2'):
print(' ' + i['html_url'])
print()
print("################## 以下仓库会被同步 ################## ")
print("{repo1Name}{repo2Name} 匹配相同的仓库".format(repo1Name = repo1Name, repo2Name = repo2Name))
for i in matchList.get('match'):
print(' ' + i.get("from").get("full_name").ljust(30,' ') + " -> " + i.get("to").get("full_name"))
print()

86
assets/transferRepos.py Normal file
View File

@@ -0,0 +1,86 @@
import os
def transferRepos(matchList, WorkingDir, fromRepoProtocol = 'https', toRepoProtocol = 'https'):
# print(matchList)
# 切换路径
targetPath = os.path.abspath(WorkingDir)
os.chdir(targetPath)
if(os.path.abspath(os.getcwd()) != targetPath): # 展开为绝对路径,然后进行比较
print("[error] 切换路径失败")
print("当前路径", os.path.abspath(os.getcwd()))
print("想要切换到的路径", targetPath)
input("按回车键退出...")
exit()
commands = []
commands.append("@echo off") # echo off
# commands.append("cls") # 清屏
# commands.append('')
for repo in matchList:
# 查看当前目录
# commands.append('echo 当前目录')
# commands.append('chdir')
# 克隆仓库
localRepoFolder = repo['from']['full_name'].split('/')[-1] + ".git"
if not os.path.exists(WorkingDir + "/" + localRepoFolder):
print(WorkingDir + "/" + localRepoFolder)
repo_url = repo['from']['html_url']
if toRepoProtocol != 'https':
repo_url = repo['from']['ssh_url']
# commands.append('echo 克隆仓库')
commands.append("git clone --mirror {repo_url}".format(repo_url = repo_url))
# 切换到仓库目录
# commands.append('echo 切换到仓库目录')
commands.append("cd {folder_name}".format(folder_name = localRepoFolder))
# 更新本地仓库
# 不可以使用 git fetch --all 如果仓库中有hidden ref则推送时会报错
# commands.append('echo 更新本地仓库')
commands.append("git remote update")
# 本地存储库GC (没有必要)
# commands.append("git gc")
# 同步仓库
repo_url = repo['to']['html_url']
if toRepoProtocol != 'https':
repo_url = repo['to']['ssh_url']
# commands.append('echo 推送仓库到远程({repo_url}'.format(repo_url = repo_url))
commands.append("git push --mirror {repo_url}".format(repo_url = repo_url))
# 切换回上一级目录
# commands.append('echo 回到工作目录')
commands.append("cd ../")
# commands.append('echo 当前仓库克隆完成,等待用户确认,按任意键进行下一步操作 & pause')
# commands.append("pause")
# 空行
commands.append('')
commands.append('echo 命令执行完成')
commands.append("pause")
print("本项目还处于测试阶段,出于安全考虑,我们采用生成命令文件的方式对仓库进行操作,以免",
"由于脚本错误造成数据丢失。我们强烈建议您在继续前先手动备份您的仓库,以免丢失代码。",
"由于代码错误或您自己失误造成的代码仓库丢失,项目开发者不承担责任。在执行脚本前,请",
"务必确认您知晓该行命令的执行结果,切勿盲目执行您不知道的命令!", sep = "\n")
print("\033[1;37;41m继续前请务全量必备份仓库\033[0m")
print("\033[1;37;41m继续前请务全量必备份仓库\033[0m")
print("\033[1;37;41m继续前请务全量必备份仓库\033[0m")
print("继续操作代表您已阅读上述内容,程序将在工作目录下生成一个批处理脚本")
if input("按<回车>键继续或输入run直接执行(不推荐): ") == "run":
for commandForExecute in commands:
print("[正在执行]", commandForExecute)
os.system(commandForExecute)
else:
commandTxtPath = os.path.abspath(WorkingDir + "/commands.bat")
f=open(commandTxtPath, "w")
f.write('\n'.join(commands))
f.close()
print("命令文件生成完毕,请查看:", commandTxtPath)
# for command in commands:
# print(command)
# os.system(command)
# :: 创建文件夹
# mkdir D:\gitTransTempDir
# :: 如果之前有没删除的话就删除
# rd /s /q ./chrome-extension.git
# rd /s /q D:\gitTransTempDir