diff --git a/src/appengine/libs/access.py b/src/appengine/libs/access.py index bf54ea82905..e66b3b3e47d 100644 --- a/src/appengine/libs/access.py +++ b/src/appengine/libs/access.py @@ -29,11 +29,7 @@ def _is_privileged_user(email): if local_config.AuthConfig().get('all_users_privileged'): return True - privileged_user_emails = (db_config.get_value('privileged_users') or - '').splitlines() - return any( - utils.emails_equal(email, privileged_user_email) - for privileged_user_email in privileged_user_emails) + return auth.is_current_user_privileged(email) def _is_blacklisted_user(email): diff --git a/src/appengine/libs/auth.py b/src/appengine/libs/auth.py index 9188bfeab01..f60621519e3 100644 --- a/src/appengine/libs/auth.py +++ b/src/appengine/libs/auth.py @@ -35,6 +35,8 @@ User = collections.namedtuple('User', ['email']) BEARER_PREFIX = 'Bearer ' +PRIVILEGED_USER_ROLE = 'roles/privilegedUser' + class AuthError(Exception): """Auth error.""" @@ -71,6 +73,37 @@ def is_current_user_admin(): return bool(key.get()) +@memoize.wrap(memoize.FifoInMemory(1)) +def _has_iam_permission(permission, user_email, project_id): + """Check if a user has a specific IAM permission.""" + if environment.is_local_development(): + # For local development, we can grant all permissions. + return True + + try: + service = build('cloudresourcemanager', 'v1') + body = {'permissions': [permission]} + # pylint: disable=no-member + response = service.projects().testIamPermissions( + resource=project_id, body=body).execute() + + return permission in response.get('permissions', []) + except Exception as e: + logs.error(f'Failed to check IAM permission: {e}') + return False + + +def is_current_user_privileged(user_email): + """Returns whether the current user has privileged access.""" + + project_id = utils.get_application_id() + role_id = f'projects/{project_id}/{PRIVILEGED_USER_ROLE}' + logs.info(f'Test privileged role: {role_id}') + has_iam_privilege = _has_iam_permission(role_id, user_email, project_id) + + return has_iam_privilege + + @memoize.wrap(memoize.FifoInMemory(1)) def _project_number_from_id(project_id): """Get the project number from project ID."""