462 lines
15 KiB
Python
Executable File
462 lines
15 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
#
|
|
# https://pypi.org/project/argcomplete/#global-completion
|
|
# PYTHON_ARGCOMPLETE_OK
|
|
|
|
"""This script provides tooling to administrate the github.com/zeek organization."""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from enum import IntEnum
|
|
from pathlib import Path
|
|
|
|
# This is the high-level error for API problems, per
|
|
# https://github.com/fastai/ghapi/issues/138 and confirmed
|
|
# in manual testing
|
|
from urllib.error import HTTPError
|
|
|
|
|
|
def print_err(*args, **kwargs):
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
# PyYAML is optional; we will see below whether we need it as we retrieve
|
|
# the auth token.
|
|
pass
|
|
|
|
try:
|
|
from ghapi.all import GhApi, paged
|
|
except ImportError:
|
|
print_err("This requires the Python Github API package: https://ghapi.fast.ai")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# Argcomplete provides command-line completion for users of argparse.
|
|
# We support it if available, but don't complain when it isn't.
|
|
import argcomplete
|
|
except ImportError:
|
|
pass
|
|
|
|
GITHUB_SERVER = "github.com"
|
|
GITHUB_ORG = "zeek"
|
|
|
|
# There's a mismatch between how Github shows access levels in the UI vs how
|
|
# they work in the API. We use the UI-level ones here and provide helper
|
|
# functions to translate. The discrepancies are "read" being "pull" in the API,
|
|
# and "write" being "push". The others match.
|
|
AccessLevel = IntEnum("AccessLevel", ["READ", "TRIAGE", "WRITE", "MAINTAIN", "ADMIN"])
|
|
|
|
|
|
def get_api_token(args):
|
|
if args.auth_token:
|
|
return args.auth_token
|
|
|
|
# This is the ghapi-recommended way to configure your token and works
|
|
# transparently:
|
|
token = os.getenv("GITHUB_TOKEN")
|
|
if token:
|
|
return token
|
|
|
|
# Try somewhat hard to locate gh's config file:
|
|
gh_host_config = (
|
|
Path(os.getenv("XDG_CONFIG_HOME", default=Path.home() / ".config"))
|
|
/ "gh"
|
|
/ "hosts.yml"
|
|
)
|
|
if gh_host_config.is_file():
|
|
if "yaml" not in sys.modules:
|
|
print_err(
|
|
"This requires PyYAML to use your existing gh config. See: https://pyyaml.org"
|
|
)
|
|
print_err(
|
|
"Please set the GITHUB_TOKEN environment variable to your Github API token."
|
|
)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
with open(gh_host_config) as stream:
|
|
ymldata = yaml.safe_load(stream)
|
|
return ymldata[GITHUB_SERVER]["oauth_token"]
|
|
except (OSError, yaml.YAMLError, KeyError) as error:
|
|
print_err(f"Unexpected gh configuration ({error}).")
|
|
print_err(
|
|
"Please set the GITHUB_TOKEN environment variable to your Github API token."
|
|
)
|
|
sys.exit(1)
|
|
|
|
print_err(
|
|
"Please provide a Github API token via --auth-token or set the GITHUB_TOKEN environment variable"
|
|
)
|
|
sys.exit(1)
|
|
|
|
|
|
def get_access_level_string(perms):
|
|
res = get_access_level(perms)
|
|
if not res:
|
|
return None
|
|
return res.name.lower()
|
|
|
|
|
|
def access_level_to_github(level):
|
|
if level == AccessLevel.ADMIN:
|
|
return "admin"
|
|
if level == AccessLevel.MAINTAIN:
|
|
return "maintain"
|
|
if level == AccessLevel.WRITE:
|
|
return "push"
|
|
if level == AccessLevel.TRIAGE:
|
|
return "triage"
|
|
if level == AccessLevel.READ:
|
|
return "pull"
|
|
return None
|
|
|
|
|
|
def access_level_from_github(level):
|
|
if level == "admin":
|
|
return AccessLevel.ADMIN
|
|
if level == "maintain":
|
|
return AccessLevel.MAINTAIN
|
|
if level == "push":
|
|
return AccessLevel.WRITE
|
|
if level == "triage":
|
|
return AccessLevel.TRIAGE
|
|
if level == "pull":
|
|
return AccessLevel.READ
|
|
return None
|
|
|
|
|
|
def get_access_level(perm):
|
|
"""Returns the access level for a given user/team as shown on the access
|
|
settings page of a repo, based on the given set of permissions. This returns
|
|
an AccessLevel, or None when no access level was identifiable.
|
|
"""
|
|
# This is a bit messy because permissions are presented differently in
|
|
# different corners of the API. Sometimes it's a string, sometimes an
|
|
# object with individual permission bits. For now just check if it's a
|
|
# string and handle that separately.
|
|
if isinstance(perm, str):
|
|
return access_level_from_github(perm)
|
|
|
|
try:
|
|
if perm.admin:
|
|
return AccessLevel.ADMIN
|
|
if perm.maintain:
|
|
return AccessLevel.MAINTAIN
|
|
if perm.push:
|
|
return AccessLevel.WRITE
|
|
if perm.triage:
|
|
return AccessLevel.TRIAGE
|
|
if perm.pull:
|
|
return AccessLevel.READ
|
|
except AttributeError:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def get_all_repos(api):
|
|
"""Returns a list of all repos in the GITHUB_ORG, sorted by name."""
|
|
repos = []
|
|
|
|
for page in paged(api.repos.list_for_org, org=GITHUB_ORG, per_page=100):
|
|
repos.extend(page)
|
|
|
|
repos.sort(key=lambda repo: repo.name)
|
|
return repos
|
|
|
|
|
|
def cmd_repos_list_access(api, args):
|
|
result = []
|
|
|
|
for repo in get_all_repos(api):
|
|
teams = api.repos.list_teams(repo.name)
|
|
teams_results = []
|
|
for team in sorted(teams, key=lambda t: t.name):
|
|
teams_results.append(
|
|
{
|
|
"name": team.name,
|
|
"access": get_access_level_string(team.permissions),
|
|
}
|
|
)
|
|
|
|
# "direct" here means we don't list each individual member of teams that
|
|
# also have access to the repo:
|
|
users = api.repos.list_collaborators(repo.name, affiliation="direct")
|
|
users_results = []
|
|
for user in sorted(users, key=lambda u: u.login):
|
|
users_results.append(
|
|
{
|
|
"name": user.login,
|
|
"access": get_access_level_string(user.permissions),
|
|
}
|
|
)
|
|
|
|
result.append(
|
|
{
|
|
"repo": repo.name,
|
|
"teams": teams_results,
|
|
"users": users_results,
|
|
}
|
|
)
|
|
|
|
print(json.dumps(result, sort_keys=True))
|
|
return True
|
|
|
|
|
|
def cmd_repos_list_redundant_users(api, args):
|
|
result = []
|
|
|
|
for repo in get_all_repos(api):
|
|
teams = api.repos.list_teams(repo.name)
|
|
teams_data = {}
|
|
|
|
for team in teams:
|
|
teams_data[team.name] = {
|
|
"permission": team.permission,
|
|
"users": [
|
|
member.login
|
|
for member in api.teams.list_members_in_org(GITHUB_ORG, team.slug)
|
|
],
|
|
}
|
|
|
|
# "direct" here means we don't list each individual member of teams that
|
|
# also have access to the repo:
|
|
users = api.repos.list_collaborators(repo.name, affiliation="direct")
|
|
|
|
for user in sorted(users, key=lambda u: u.login):
|
|
for team, data in teams_data.items():
|
|
if user.login not in data["users"]:
|
|
continue
|
|
# This repo gives access to a team that already includes the
|
|
# explicitly listed user. See if the user is added with
|
|
# greater permissions, otherwise this user is redundant.
|
|
team_access = get_access_level(data["permission"])
|
|
user_access = get_access_level(user.permissions)
|
|
|
|
if user_access <= team_access:
|
|
result.append(
|
|
{
|
|
"repo": repo.name,
|
|
"user": user.login,
|
|
"redundancy": team,
|
|
}
|
|
)
|
|
|
|
print(json.dumps(result, sort_keys=True))
|
|
return True
|
|
|
|
|
|
def cmd_repos_remove_redundant_users(api, args):
|
|
result = []
|
|
outcome = True
|
|
|
|
for repo in get_all_repos(api):
|
|
teams = api.repos.list_teams(repo.name)
|
|
teams_data = {}
|
|
|
|
for team in teams:
|
|
teams_data[team.name] = {
|
|
"permission": team.permission,
|
|
"users": [
|
|
member.login
|
|
for member in api.teams.list_members_in_org(GITHUB_ORG, team.slug)
|
|
],
|
|
}
|
|
|
|
# "direct" here means we don't list each individual member of teams that
|
|
# also have access to the repo:
|
|
users = api.repos.list_collaborators(repo.name, affiliation="direct")
|
|
|
|
for user in sorted(users, key=lambda u: u.login):
|
|
for team, data in teams_data.items():
|
|
if user.login not in data["users"]:
|
|
continue
|
|
# This repo gives access to a team that already includes the
|
|
# explicitly listed user. See if the user is added with
|
|
# greater permissions, otherwise this user is redundant.
|
|
team_access = get_access_level(data["permission"])
|
|
user_access = get_access_level(user.permissions)
|
|
|
|
if user_access <= team_access:
|
|
try:
|
|
if not args.dry_run:
|
|
api.repos.remove_collaborator(repo.name, user.login)
|
|
result.append(
|
|
{
|
|
"repo": repo.name,
|
|
"user": user.login,
|
|
"success": True,
|
|
}
|
|
)
|
|
except HTTPError as err:
|
|
result.append(
|
|
{
|
|
"repo": repo.name,
|
|
"user": user.login,
|
|
"success": False,
|
|
"code": err.code,
|
|
"reason": err.reason,
|
|
}
|
|
)
|
|
outcome = False
|
|
|
|
print(json.dumps(result, sort_keys=True))
|
|
return outcome
|
|
|
|
|
|
def cmd_repos_add_team(api, args):
|
|
if args.access_level not in [level.name.lower() for level in AccessLevel]:
|
|
print_err(
|
|
f"The given access level '{args.access_level}' is not valid. See --help.\n"
|
|
)
|
|
return False
|
|
|
|
access_level = access_level_to_github(AccessLevel[args.access_level.upper()])
|
|
|
|
# Resolve the given team name to a slug:
|
|
team_slug = None
|
|
teams = sorted(api.teams.list(GITHUB_ORG), key=lambda t: t.name)
|
|
for team in teams:
|
|
if (
|
|
team.name.lower() == args.team.lower()
|
|
or team.slug.lower() == args.team.lower()
|
|
):
|
|
team_slug = team.slug
|
|
break
|
|
|
|
if not team_slug:
|
|
print_err(
|
|
f"Team '{args.team}' is not valid for the '{GITHUB_ORG}' organization."
|
|
)
|
|
print_err(f"Must be one of {', '.join(["'" + t.name + "'" for t in teams])}.")
|
|
return False
|
|
|
|
result = []
|
|
outcome = True
|
|
|
|
for repo in get_all_repos(api):
|
|
try:
|
|
if not args.dry_run:
|
|
api.teams.add_or_update_repo_permissions_in_org(
|
|
GITHUB_ORG, team_slug, repo.name, access_level
|
|
)
|
|
result.append(
|
|
{
|
|
"repo": repo.name,
|
|
"success": True,
|
|
}
|
|
)
|
|
except HTTPError as err:
|
|
result.append(
|
|
{
|
|
"repo": repo.name,
|
|
"success": False,
|
|
"code": err.code,
|
|
"reason": err.reason,
|
|
}
|
|
)
|
|
outcome = False
|
|
|
|
print(json.dumps(result, sort_keys=True))
|
|
return outcome
|
|
|
|
|
|
def main():
|
|
top_parser = argparse.ArgumentParser(
|
|
description="Helpers for automated Zeek repo administration on Github."
|
|
)
|
|
|
|
top_parser.add_argument(
|
|
"--auth-token",
|
|
default=None,
|
|
help="Your Github API token. You can also use the GITHUB_TOKEN environment "
|
|
"variable to provide this. As a fallback, this script tries to use your "
|
|
"gh configuration, if available.",
|
|
)
|
|
|
|
cmd_parser = top_parser.add_subparsers(
|
|
title="commands",
|
|
dest="command",
|
|
help="See `%(prog)s <command> -h` for per-command usage info.",
|
|
)
|
|
|
|
repos_parser = cmd_parser.add_parser(
|
|
"repos", help="Do something for every repo in the Zeek organization."
|
|
)
|
|
|
|
repos_cmd_parser = repos_parser.add_subparsers(
|
|
title="commands",
|
|
dest="command",
|
|
help="See `%(prog)s <command> -h` for per-command usage info.",
|
|
)
|
|
|
|
repos_list_access_cmd_parser = repos_cmd_parser.add_parser(
|
|
"list-access",
|
|
help="List all users and teams that have access to a repo, along with the access level.",
|
|
)
|
|
repos_list_access_cmd_parser.set_defaults(run_cmd=cmd_repos_list_access)
|
|
|
|
repos_list_redundant_users_cmd_parser = repos_cmd_parser.add_parser(
|
|
"list-redundant-users",
|
|
help="List users already covered via teams providing at least the same access level.",
|
|
)
|
|
repos_list_redundant_users_cmd_parser.set_defaults(
|
|
run_cmd=cmd_repos_list_redundant_users
|
|
)
|
|
|
|
repos_remove_redundant_user_cmd_parser = repos_cmd_parser.add_parser(
|
|
"remove-redundant-users",
|
|
help="Remove users already included via a team that provides at least the same access level.",
|
|
)
|
|
repos_remove_redundant_user_cmd_parser.set_defaults(
|
|
run_cmd=cmd_repos_remove_redundant_users
|
|
)
|
|
repos_remove_redundant_user_cmd_parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Don't make any changes, just pretend API calls succeed",
|
|
)
|
|
|
|
repos_add_team_cmd_parser = repos_cmd_parser.add_parser(
|
|
"add-team",
|
|
help="Add the given team to each repository at the given access level. Teams that already have access are updated to the provided level.",
|
|
)
|
|
repos_add_team_cmd_parser.set_defaults(run_cmd=cmd_repos_add_team)
|
|
|
|
repos_add_team_cmd_parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Don't make any changes, just pretend API calls succeed",
|
|
)
|
|
repos_add_team_cmd_parser.add_argument(
|
|
"--team", required=True, help="The name or slug of the team. Case-insensitive."
|
|
)
|
|
repos_add_team_cmd_parser.add_argument(
|
|
"--access-level",
|
|
required=True,
|
|
help=f"The access level. One of {', '.join(["'" + level.name.lower() + "'" for level in AccessLevel])}.",
|
|
)
|
|
|
|
if "argcomplete" in sys.modules:
|
|
argcomplete.autocomplete(top_parser)
|
|
|
|
args = top_parser.parse_args()
|
|
api = GhApi(owner=GITHUB_ORG, token=get_api_token(args))
|
|
|
|
try:
|
|
return args.run_cmd(api, args)
|
|
except HTTPError as err:
|
|
sys.stderr.write(f"API error at {err.url}: {err.code}, {err.reason}\n")
|
|
return False
|
|
except KeyboardInterrupt:
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(0 if main() else 1)
|