import sqlalchemy as sa
import tahrir_api.model as m
from flask import abort, g, jsonify, request
from ..app import csrf, oidc
from ..utils.avatar import hash_email
from ..utils.user import (
_populate_access_user,
create_person,
get_person,
get_user_badge_info,
need_access_user,
)
from . import blueprint as bp
[docs]
@bp.route("/api/users/search/<search_string>", methods=["GET"])
def search_users_by_string(search_string: str):
"""
Search endpoint that returns users matching the search string
"""
# We need to have a function for searching users in Tahrir API
# Instead of doing this over here like this
begin = request.args.get("begin", 0, type=int)
limit = request.args.get("limit", 100, type=int)
collection = (
g.tahrirdb.get_all_persons(include_opted_out=True)
.filter(sa.func.lower(m.Person.nickname).like(f"%{search_string.lower()}%"))
.all()
)
# Suggested function should also include pagination feature
result = {
"users": [
{
"id": item.id,
"bio": item.bio if item.bio else None,
"created_on": item.created_on.timestamp() if item.created_on else None,
"email": hash_email(item.avatar),
"last_login": item.last_login.timestamp() if item.last_login else None,
"nickname": item.nickname,
"opt_out": item.opt_out,
"rank": item.rank,
"website": item.website,
}
for item in collection[begin : begin + (limit if limit < 100 else 100)]
],
"castup": len(collection),
}
return jsonify(result)
[docs]
@bp.route("/api/users/<string:user_id>", methods=["GET"])
def get_user_by_id(user_id: str):
"""Endpoint to fetch the user based on matching id."""
user = get_person(user_id)
if not user:
abort(404, f"No such user {user_id!r}")
requester_email = None
if request.authorization:
try:
_populate_access_user()
requester_email = g.token_email
except Exception:
pass
if user.opt_out and user.email != requester_email:
abort(404, f"User {user_id!r} has opted out.")
# Get badge info using utility function
badges_info = get_user_badge_info(user)
return jsonify(
{
"user": {
"nickname": user.nickname,
"mail": hash_email(user.avatar),
"created_on": user.created_on if user.created_on else None,
"opt_out": user.opt_out,
"rank": badges_info["rank"],
},
**badges_info,
}
)
[docs]
@bp.route("/api/users/opt_out", methods=["PUT"])
@csrf.exempt
@oidc.accept_token()
@need_access_user
def user_opt_out():
"""Endpoint to update user account settings."""
data = request.get_json()
if not data.get("opt_out"):
abort(400, "No data provided")
g.token_person.opt_out = data.get("opt_out")
g.tahrirdb.session.commit()
return jsonify({"message": "User updated successfully"})
[docs]
@bp.route("/api/users/diff/<string:id_a>/<string:id_b>", methods=["GET"])
@csrf.exempt
@oidc.accept_token()
@need_access_user
def get_user_diff(id_a: str, id_b: str):
"""Endpoint to compare badges between two users."""
user_a = get_person(id_a)
user_b = get_person(id_b)
if not user_a:
abort(404, f"No such user {id_a!r}")
if not user_b:
abort(404, f"No such user {id_b!r}")
if user_a.opt_out and user_a.email != g.token_email:
abort(404, f"User {id_a!r} has opted out.")
if user_b.opt_out and user_b.email != g.token_email:
abort(404, f"User {id_b!r} has opted out.")
if user_a.email != g.token_email:
abort(401)
# Get badge info using utility function
user_a_info = get_user_badge_info(user_a)
user_b_info = get_user_badge_info(user_b)
# Get raw badge objects for diffing
user_a_badges = [a.badge for a in user_a.assertions]
user_b_badges = [a.badge for a in user_b.assertions]
# Diff badges
user_a_unique_badges = []
user_b_unique_badges = []
combined_badges = list(sorted(set(user_a_badges + user_b_badges), key=lambda badge: badge.id))
shared_badges = []
for badge in combined_badges:
if badge in user_a_badges and badge not in user_b_badges:
user_a_unique_badges.append(badge)
elif badge in user_b_badges and badge not in user_a_badges:
user_b_unique_badges.append(badge)
elif badge in user_a_badges and badge in user_b_badges:
shared_badges.append(badge)
return jsonify(
{
"user_a": {
"id": user_a.id,
"nickname": user_a.nickname,
"avatar": hash_email(user_a.avatar),
"badges_count": len(user_a_info["badges"]),
"percent_earned": user_a_info["percent_earned"],
"rank": user_a_info["rank"],
"percentile": user_a_info["percentile"],
},
"user_b": {
"id": user_b.id,
"nickname": user_b.nickname,
"avatar": hash_email(user_b.avatar),
"badges_count": len(user_b_info["badges"]),
"percent_earned": user_b_info["percent_earned"],
"rank": user_b_info["rank"],
"percentile": user_b_info["percentile"],
},
"user_a_badges": user_a_info["badges"],
"user_b_badges": user_b_info["badges"],
"user_a_unique_badges": [
b
for b in user_a_info["badges"]
if any(ub.id == b["id"] for ub in user_a_unique_badges)
],
"user_b_unique_badges": [
b
for b in user_b_info["badges"]
if any(ub.id == b["id"] for ub in user_b_unique_badges)
],
"shared_badges": [
b for b in user_a_info["badges"] if any(sb.id == b["id"] for sb in shared_badges)
],
}
)
[docs]
@bp.route("/api/users/", methods=["POST"])
@csrf.exempt
@oidc.accept_token()
def after_login():
"""Create the person if it does not exist."""
if not request.authorization:
abort(403)
_populate_access_user()
person = create_person(g.token_profile["preferred_username"], g.token_profile["email"])
if not person:
abort(404)
return jsonify(person.as_dict())