Files
gitea_repo_creator/create_repo.py

322 lines
13 KiB
Python

import requests
# for pretty printing
import json
from pygments import highlight
from pygments.lexers import JsonLexer
from pygments.formatters import TerminalFormatter
import argparse
import contextlib
import tomli
import tomli_w
from dotmap import DotMap
import click
import os
import sys
import re
def get_config():
# Create an ArgumentParser object
args = get_args()
config = config_from_file(args)
update_config_from_args(args, config)
check_required_options(config)
if config._script.verbose > 0:
config.pprint()
return config
def config_from_file(args):
config = DotMap()
if args.config_file:
try:
with open(args.config_file, 'rb') as file:
config = DotMap(tomli.load(file))
except FileNotFoundError:
if args.verbose > 0:
print(f"File '{args.config_file}' not found.")
return config
def update_config_from_args(args, config):
config._script.isgit = os.path.exists(".git")
config._script.cwd = os.getcwd().split('\\')[-1]
config._script.confirm = False if args.yes else config._script.get('confirm', True)
config._script.out = args.out or config._script.get('out', None)
config._script.verbose = args.verbose or config._script.get('verbose', 0)
config._script.dryrun = args.dryrun or config._script.get('dryrun', None)
config._script.use_cwd = args.use_cwd or config._script.get('use_cwd', None)
config.repo.name = (
config._script.cwd if config._script.use_cwd else
(args.repo_name or config.repo.get('name', None))
)
config.connection.token = args.conn_token or config.connection.get('token',None)
config.connection.address = args.instance_address or config.connection.get('address',None)
config.repo.auto_init = args.repo_auto_init or config.repo.get('auto_init',None)
config.repo.default_branch = args.repo_default_branch or config.repo.get('default_branch',None)
config.repo.description = args.repo_description or config.repo.get('description',None)
config.repo.gitignores = args.repo_gitignores or config.repo.get('gitignores',None)
config.repo.issue_labels = args.repo_issue_labels or config.repo.get('issue_labels',None)
config.repo.license = args.repo_license or config.repo.get('license',None)
config.repo.object_format_name = args.repo_object_format_name or config.repo.get('object_format_name',None)
config.repo.private = args.repo_private or config.repo.get('private',None)
config.repo.readme = args.repo_readme or config.repo.get('readme',None)
config.repo.template = args.repo_template or config.repo.get('template',None)
config.repo.trust_model = args.repo_trust_model or config.repo.get('trust_model',None)
config._dynamic = False
def check_required_options(config):
if config.repo.name is None:
config.repo.name = interactive_repo_name()
if config.connection.address is None:
raise ValueError('instance address missing')
if config.connection.token is None:
raise ValueError('authorization token missing')
def interactive_repo_name(c):
for _ in range(3):
repo_name = input("Enter a name for the repository: ")
if re.match("^[a-zA-Z0-9_-]+$", repo_name):
return repo_name
print("Invalid name. Please use only ASCII letters, numbers, dashes, or underscores.")
print('too many tries, exiting')
sys.exit()
def get_args():
parser = argparse.ArgumentParser(description='Create a repository on a gitea instance')
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase output verbosity')
parser.add_argument('-d', '--dryrun', action='store_true', help='don\'t actually do anything')
parser.add_argument('-c', '--use_cwd', action='store_true', help='Use current working directory as repo_name')
parser.add_argument('-y', '--yes', action='store_true', help='don\'t ask me before creating repo')
parser.add_argument('repo_name', nargs='?', help='Name of the repository to create', default=None)
parser.add_argument('-f', '--config_file', help='Path to the TOML configuration file')
parser.add_argument('-o', '--out', help='Path to output used config to', default=None)
parser.add_argument('-t', '--conn_token', '--token', help='gitea authorization token', type=str, default=None)
parser.add_argument('-a', '--instance_address', '--address', help='gitea instance address', type=str, default=None)
argument_list = [
# ['name', 'Name of the repository to create', str, None, 'true'],
['auto_init', 'Whether the repository should be auto-initialized?', bool, None, 'false'],
['default_branch', 'DefaultBranch of the repository (used when initializes and in template)', str, None, 'false'],
['description', 'Description of the repository to create', str, None, 'false'],
['gitignores', 'Gitignores to use', str, None, 'false'],
['issue_labels', 'Label-Set to use', str, None, 'false'],
['license', 'License to use', str, None, 'false'],
['object_format_name', 'ObjectFormatName of the underlying git repository', str, ['sha1', 'sha256'], 'false'],
['private', 'Whether the repository is private', bool, None, 'false'],
['readme', 'Readme of the repository to create', str, None, 'false'],
['template', 'Whether the repository is a template', bool, None, 'false'],
['trust_model', 'TrustModel of the repository', str, ['default', 'collaborator', 'committer', 'collaboratorcommiter'], 'false']
]
for arg_info in argument_list:
parser.add_argument(f'--repo_{arg_info[0]}', f'--{arg_info[0]}', help=arg_info[1], type=arg_info[2], choices=arg_info[3])
args = parser.parse_args()
return args
def generate_header(config):
return {
'accept': 'application/json',
'Authorization': f'token {config.connection.token}',
# 'Content-Type': 'application/json', # Already added when you pass json=
}
def generate_payload(config):
return {key: value for key, value in config.repo.items() if value is not None}
def pretty_print_json(obj, title=None):
print_obj = json.loads(obj) if isinstance(obj, (bytes, str)) else obj
json_str = json.dumps(print_obj, indent=2, sort_keys=False) # prettify
if title:
print(title)
print(highlight(json_str, JsonLexer(), TerminalFormatter())) # print
def create_repo(config):
export_handler(config)
headers = generate_header(config)
json_data = generate_payload(config)
if config._script.verbose > 0:
print()
pretty_print_json(headers, title='Headers')
pretty_print_json(json_data, title='Body')
confirm_handler(config)
if not config._script.dryrun:
return requests.post(f'https://{config.connection.address}/api/v1/user/repos', headers=headers, json=json_data)
else:
print('(DRYRUN) created repo')
return None
def confirm_handler(config):
if config._script.confirm:
dryrunstr = '(DRYRUN) ' if config._script.dryrun else ''
print(f'{dryrunstr}I\'m about to create \033[1;34m{config.repo.name}\033[0m at \033[1;34m{config.connection.address}\033[0m')
if not click.confirm(f'{dryrunstr}Continue?', default=True):
print('exiting')
sys.exit()
def filter_none_values(data):
if isinstance(data, dict):
return {k: filter_none_values(v) for k, v in data.items() if v is not None}
else:
return data
def remove_keys(data, keys):
if not isinstance(keys, (list, tuple, set)):
keys = [keys] # Convert single value to a list
for key in keys:
data.pop(key, None)
return data
def export_handler(config):
if config._script.out is not None:
config_export = config.toDict()
config_export = remove_keys(config_export, '_script')
config_export = filter_none_values(config_export)
with open(config._script.out, 'wb') as outf:
tomli_w.dump(config_export, outf)
def status_handler(response, config):
if config._script.dryrun:
print('(DRYRUN) status handler')
return
match response.status_code:
case 201:
success_handler(response, config)
case 400:
print('\033[0;31m400\033[0m: APIError\n')
pretty_print_json(response.content)
case 401:
print('\033[0;31m401\033[0m: User does not exist\n')
case 409:
print(f'\033[0;31mError\033[0m: A repository named \033[1;34m{config.repo.name}\033[0m already exists for the user\n')
case 422:
print('\033[0;31m422\033[0m: APIValidationError\n')
pretty_print_json(response.content)
case _:
pretty_print_json(response.content)
print(f'{response.status_code}: good job, you found an unspecified status code. You can report a bug at https://github.com/go-gitea/gitea/issues/new/choose')
dump_response(response)
def dump_response(response):
class DumpEncoder(json.JSONEncoder):
def default(self, obj):
try:
if isinstance(obj, (dict, list, tuple)):
return self.serialize_nested(obj)
return super().default(obj)
except TypeError:
return self.type_error_handler(obj)
def serialize_nested(self, obj):
if isinstance(obj, dict):
return {k: self.serialize_nested(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [self.serialize_nested(item) for item in obj]
else:
return self.default(obj)
def type_error_handler(self, obj):
with contextlib.suppress(TypeError):
return self.default(json.loads(obj)) # first try interpreting it as json (load from bytes, try to serialize it)
try:
return self.default(vars(obj)) # then try converting it to a dict and serializing it
except (TypeError, json.decoder.JSONDecodeError):
return str(type(obj)) # if everything fails, just return the type
print('I\'m writing the response to response.json in your current directory')
print('This file contains sensitive information, such as your authorization token. Keep that in mind and redact the file before sharing.')
with open("response.json", mode="w") as d:
json.dump(vars(response), d, cls=DumpEncoder, indent=4, sort_keys=True)
def success_handler(response, config):
repository = DotMap(json.loads(response.content))
print('Success!')
message = f'You can find your new repository at \033[1;34m{repository.html_url}\033[0m\n'
if config._script.isgit:
message += success_handler_existing_repo(repository, config)
else:
message += success_handler_new_repo(repository, config)
print(message)
def success_handler_existing_repo(repository, config=None):
message_lines = [
'',
'To push your current repo, run the following commands:',
'',
f'git remote add origin {repository["ssh_url"]}',
'git push -u origin main',
'',
]
return '\n'.join(message_lines)
def success_handler_new_repo(repository, config=None):
if len(os.listdir(os.getcwd())) >= 0:
message_lines = nonempty_dir_method(repository)
else:
message_lines = empty_dir_message(repository)
return '\n'.join(message_lines)
def nonempty_dir_method(repository):
return [
'',
'Current directory is not empty. To use the files in your new repo, run the following commands',
'',
'git init',
f'git remote add origin {repository["ssh_url"]}',
'git pull origin main',
'git add [your files]'
'',
]
def empty_dir_message(repository):
message_lines = [
'',
'To create a new repository in your current directory, run the following commands:',
''
]
if os.name == 'nt':
message_lines.append('echo $null >> README.md')
else:
message_lines.append('touch README.md')
message_lines.extend(
(
'git init',
'git checkout -b main',
'git add README.md',
'git commit -m "first commit"',
f'git remote add origin {repository["ssh_url"]}',
'git push -u origin main',
'',
)
)
return message_lines
def main():
try:
config = get_config()
resp = create_repo(config)
status_handler(resp, config)
except KeyboardInterrupt:
sys.exit()
if __name__ == '__main__':
main()