Source code for tahrir.utils.user

from functools import wraps
from urllib.parse import quote_plus

from flask import abort, current_app, g, redirect, request, session, url_for
from flask_oidc.model import User as OIDCUser


[docs] class User(OIDCUser): def __init__(self, ext): super().__init__(ext) self._person = None self._awarded_assertions = None
[docs] def reset_cache(self): self._person = None self._awarded_assertions = None
def _has_token(self): return session.get("oidc_auth_token") is not None @property def logged_in(self): return self._has_token() and self.person is not None @property def email(self): if not self.logged_in: return None return f"{self.name}@{current_app.config['TAHRIR_EMAIL_DOMAIN']}" @property def person(self): if self.name is None: return None if self._person is None: self._person = g.tahrirdb.get_person(nickname=self.name) return self._person @property def awarded_assertions(self): if self.name is None: return [] if self._awarded_assertions is None: self._awarded_assertions = get_awarded_assertions(self.name) return self._awarded_assertions @property def is_admin(self): return ( len(set(self.groups).intersection(set(current_app.config["TAHRIR_ADMIN_GROUPS"]))) > 0 )
[docs] def on_authorized(sender, **kwargs): nickname = g.oidc_user.name if current_app.config["TAHRIR_USE_OPENID_EMAIL"]: email = g.oidc_user.profile["email"] avatar = None else: email = f"{nickname}@{current_app.config['TAHRIR_EMAIL_DOMAIN']}" avatar = g.oidc_user.profile["email"] existing = g.tahrirdb.get_person(person_email=email) if not existing: # Keep adding underscores until we get a default nickname # that isn't already used. while g.tahrirdb.get_person(nickname=nickname): nickname += "_" g.tahrirdb.add_person(email=email, nickname=nickname, avatar=avatar) g.oidc_user.reset_cache() else: # User exists, update the avatar if existing._avatar != avatar: existing._avatar = avatar g.tahrirdb.session.commit() g.oidc_user.reset_cache() # Note that they have logged in if we are installed with a newer version of # the db API that supports this. if hasattr(g.tahrirdb, "note_login"): g.tahrirdb.note_login(person_email=email)
[docs] def get_person(id_or_nickname): """Attempt to get a user by their id or nickname, returning None if we fail.""" if id_or_nickname is None: return None user = g.tahrirdb.get_person(nickname=id_or_nickname) if user: return user else: try: # We cast user_id to an integer so that Postgres doesn't # get upset about comparing what is potentially a string # to an integer column. return g.tahrirdb.get_person(id=int(id_or_nickname)) except ValueError: return None
[docs] def get_awarded_assertions(username): if username is None: return [] email = f"{username}@{current_app.config['TAHRIR_EMAIL_DOMAIN']}" assertions = g.tahrirdb.get_assertions_by_email(email) if assertions is False: # tahrir-api returns False when the user does not exist. assertions = [] return assertions
[docs] def require_login(view_func): """ Use this to decorate view functions that require a user to be logged in. If the user is not already logged in, they will be sent to the Provider to log in, after which they will be returned. .. versionadded:: 1.0 This was :func:`check` before. """ @wraps(view_func) def decorated(*args, **kwargs): if not g.oidc_user.logged_in: redirect_uri = "{login}?next={here}".format( login=url_for("oidc_auth.login"), here=quote_plus(request.url), ) return redirect(redirect_uri) return view_func(*args, **kwargs) return decorated
[docs] def require_admin(view_func): """ Use this to decorate view functions that require a user to be logged in. This assumes the user is already logged-in. .. versionadded:: 1.0 This was :func:`check` before. """ @wraps(view_func) def decorated(*args, **kwargs): if not g.oidc_user.is_admin: abort(403, "Unauthorized: admins only.") return view_func(*args, **kwargs) return decorated