Source code for tahrir.utils.user

import logging
import typing
from functools import wraps
from urllib.parse import quote_plus

from authlib.integrations.flask_oauth2 import current_token
from flask import abort, current_app, g, redirect, request, session, url_for
from flask_oidc.model import User as OIDCUser

from tahrir.defaults import TAHRIR_DISPLAY_TAGS

from .badge import badge_json_generator

if typing.TYPE_CHECKING:
    import tahrir_api

log = logging.getLogger(__name__)


[docs] class User(OIDCUser): def __init__(self, ext): super().__init__(ext) self._person = None self._awarded_assertions = None
[docs] def reset_cache(self): self._person = None self._awarded_assertions = None
def _has_token(self): return session.get("oidc_auth_token") is not None @property def logged_in(self): return self._has_token() and self.person is not None @property def email(self): if not self.logged_in: return None return f"{self.name}@{current_app.config['TAHRIR_EMAIL_DOMAIN']}" @property def person(self): if self.name is None: return None if self._person is None: self._person = g.tahrirdb.get_person(nickname=self.name) return self._person @property def awarded_assertions(self): if self.name is None: return [] if self._awarded_assertions is None: self._awarded_assertions = get_awarded_assertions(self.name) return self._awarded_assertions @property def is_admin(self): return ( len(set(self.groups).intersection(set(current_app.config["TAHRIR_ADMIN_GROUPS"]))) > 0 )
[docs] def on_authorized(sender, **kwargs): create_person(g.oidc_user.name, g.oidc_user.profile["email"]) g.oidc_user.reset_cache()
[docs] def create_person(nickname: str, email: str) -> "tahrir_api.model.Person": if current_app.config["TAHRIR_USE_OPENID_EMAIL"]: avatar = None else: avatar = email email = f"{nickname}@{current_app.config['TAHRIR_EMAIL_DOMAIN']}" existing = g.tahrirdb.get_person(person_email=email) if not existing: # Keep adding underscores until we get a default nickname # that isn't already used. while g.tahrirdb.get_person(nickname=nickname): nickname += "_" g.tahrirdb.add_person(email=email, nickname=nickname, avatar=avatar) log.info("A new person logged in, creating the Person profile for %s", email) else: # User exists, update the avatar if existing._avatar != avatar: existing._avatar = avatar g.tahrirdb.session.commit() # Note that they have logged in if we are installed with a newer version of # the db API that supports this. if hasattr(g.tahrirdb, "note_login"): g.tahrirdb.note_login(person_email=email) return g.tahrirdb.get_person(person_email=email)
[docs] def get_person(id_or_nickname): """Attempt to get a user by their id or nickname, returning None if we fail.""" if id_or_nickname is None: return None user = g.tahrirdb.get_person(nickname=id_or_nickname) if user: return user else: try: # We cast user_id to an integer so that Postgres doesn't # get upset about comparing what is potentially a string # to an integer column. return g.tahrirdb.get_person(id=int(id_or_nickname)) except ValueError: return None
[docs] def get_awarded_assertions(username): if username is None: return [] email = f"{username}@{current_app.config['TAHRIR_EMAIL_DOMAIN']}" assertions = g.tahrirdb.get_assertions_by_email(email) if assertions is False: # tahrir-api returns False when the user does not exist. assertions = [] return assertions
[docs] def require_login(view_func): """ Use this to decorate view functions that require a user to be logged in. If the user is not already logged in, they will be sent to the Provider to log in, after which they will be returned. .. versionadded:: 1.0 This was :func:`check` before. """ @wraps(view_func) def decorated(*args, **kwargs): if not g.oidc_user.logged_in: redirect_uri = "{login}?next={here}".format( login=url_for("oidc_auth.login"), here=quote_plus(request.url), ) return redirect(redirect_uri) return view_func(*args, **kwargs) return decorated
[docs] def require_admin(view_func): """ Use this to decorate view functions that require a user to be logged in. This assumes the user is already logged-in. .. versionadded:: 1.0 This was :func:`check` before. """ @wraps(view_func) def decorated(*args, **kwargs): if not g.oidc_user.is_admin: abort(403, "Unauthorized: admins only.") return view_func(*args, **kwargs) return decorated
def _populate_access_user(): if not current_app.config.get("OIDC_ENABLED", True): userinfo = current_app.config.get("OIDC_TESTING_PROFILE", {}) else: current_token["access_token"] = request.authorization.token userinfo = g._oidc_auth.userinfo(token=current_token) g.token_profile = userinfo nickname = userinfo.get("preferred_username") or userinfo.get("nickname") if nickname: g.token_email = f"{nickname}@{current_app.config['TAHRIR_EMAIL_DOMAIN']}" g.token_person = g.tahrirdb.get_person(nickname=nickname) else: g.token_email = None g.token_person = None
[docs] def need_access_user(view_func): @wraps(view_func) def decorated(*args, **kwargs): if not request.authorization: abort(401, "Missing authorization") _populate_access_user() if g.token_person is None: abort(403, "Unknown user") return view_func(*args, **kwargs) return decorated
[docs] def need_access_root(view_func): @wraps(view_func) def decorated(*args, **kwargs): groups = g.token_profile.get("groups", []) admin_groups = current_app.config["TAHRIR_ADMIN_GROUPS"] if not set(groups).intersection(set(admin_groups)): abort(403, "Unauthorized: admins only.") return view_func(*args, **kwargs) return decorated
[docs] def get_user_badge_info(user): """Returns a dictionary of the user badge information with serialized badges""" # Get user badges badges = [assertion.badge for assertion in user.assertions] # Get total number of unique badges in the system total_badges = g.tahrirdb.get_all_badges().count() # Get percentage of badges earned by user percent_earned = (float(len(badges)) / float(total_badges)) * 100 if total_badges > 0 else 0 # Get rank of user rank = user.rank or 0 # Get total number of users user_count = g.tahrirdb.get_all_persons().count() # Get users standing percentile = round((rank / user_count) * 100, 2) if user_count > 0 else 0 # Get user assertions assertions = sorted(user.assertions, key=lambda item: item.issued_on, reverse=True) # Initialize empty list to store serailized badges serialized_badges = [] # Set the classifications of the badges classified = {name: [] for name in TAHRIR_DISPLAY_TAGS} for indx, item in enumerate(assertions): badge = badge_json_generator(item.badge, withasserts=False) serialized_badges.append({**badge}) for name in classified.keys(): if name in item.badge.tags: classified[name].append(indx) return { "badges": serialized_badges, "classified": classified, "percentile": percentile, "percent_earned": round(percent_earned, 2), "rank": rank, "total_badges": total_badges, }