# -*- coding: utf-8 -*-
# admin.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Aug 2018
# License: MIT - see the LICENSE file for the full text.
"""This contains functions to drive admin related actions (listing users,
editing users, change user roles).
"""
#############
## LOGGING ##
#############
import logging
from typing import Tuple
from types import SimpleNamespace
# get a logger
LOGGER = logging.getLogger(__name__)
#############
## IMPORTS ##
#############
from sqlalchemy import select, asc, column, table as SqlTable
from .session import auth_session_exists
from ..permissions import pii_hash, load_permissions_json
from authnzerver.actions.utils import get_procdb_permjson
######################################################
## user info columns returned by all functions here ##
######################################################
[docs]def user_info_columns(table: SqlTable) -> Tuple:
"""Returns the column expression for all required info retrieved by
a user lookup.
*table* is the users SQLAlchemy table object. Required to preserve type
information for the columns.
"""
return (
table.c.user_id,
table.c.system_id,
table.c.full_name,
table.c.email,
table.c.email_verified,
table.c.is_active,
table.c.last_login_try,
table.c.last_login_success,
table.c.failed_login_tries,
table.c.created_on,
table.c.last_updated,
table.c.user_role,
table.c.extra_info,
table.c.emailverify_sent_datetime,
table.c.emailforgotpass_sent_datetime,
table.c.emailchangepass_sent_datetime,
)
###################
## LISTING USERS ##
###################
[docs]def list_users(
payload: dict,
raiseonfail: bool = False,
override_authdb_path: str = None,
override_permissions_json: str = None,
config: SimpleNamespace = None,
) -> dict:
"""This lists users.
FIXME: add permissions checks to this instead of relying on a frontend to
filter out users who aren't allowed to perform this action.
Parameters
----------
payload : dict
This is the input payload dict. Required items:
- user_id: int or None. If None, all users will be returned
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
raiseonfail : bool
If True, will raise an Exception if something goes wrong.
override_authdb_path : str or None
If given as a str, is the alternative path to the auth DB.
override_permissions_json : str or None
If given as a str, is the alternative path to the permissions JSON
to use.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
The dict returned is of the form::
{'success': True or False,
'user_info': list of dicts, one per user,
'messages': list of str messages if any}
The dicts per user will contain the following items::
{'user_id','full_name', 'email',
'is_active','created_on','user_role',
'last_login_try','last_login_success'}
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=override_permissions_json,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"user_info": None,
"messages": ["Invalid user info request."],
}
if "user_id" not in payload:
LOGGER.error(
"[%s] Invalid user list request, missing %s."
% (payload["reqid"], "user_id")
)
return {
"success": False,
"failure_reason": (
"invalid request: missing 'user_id' in request"
),
"user_info": None,
"messages": ["No user_id provided."],
}
user_id = payload["user_id"]
try:
users = meta.tables["users"]
if user_id is None:
s = (
select(*user_info_columns(users))
.order_by(asc(users.c.user_id))
.select_from(users)
)
else:
s = (
select(*user_info_columns(users))
.order_by(asc(users.c.user_id))
.select_from(users)
.where(users.c.user_id == user_id)
)
with engine.begin() as conn:
result = conn.execute(s)
rows = result.fetchall()
try:
serialized_result = [dict(x) for x in rows]
LOGGER.info(
"[%s] User lookup request succeeded. "
"user_id provided: %s."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
)
)
return {
"success": True,
"user_info": serialized_result,
"messages": ["User look up successful."],
}
except Exception as e:
LOGGER.error(
"[%s] User lookup request failed. "
"user_id provided: %s. Exception: %r"
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"failure_reason": "target user not found in DB",
"user_info": None,
"messages": ["User look up failed."],
}
except Exception as e:
LOGGER.error(
"[%s] User lookup request failed. "
"user_id provided: %s. Exception: %r"
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"messages": ["User look up failed."],
}
[docs]def get_user_by_email(
payload: dict,
raiseonfail: bool = False,
override_authdb_path: str = None,
config: SimpleNamespace = None,
) -> dict:
"""
This gets a user's information using their email address.
FIXME: add permissions checks to this instead of relying on a frontend to
filter out users who aren't allowed to perform this action.
Parameters
----------
payload : dict
This is the input payload dict. Required items:
- email: str
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
raiseonfail : bool
If True, will raise an Exception if something goes wrong.
override_authdb_path : str or None
If given as a str, is the alternative path to the auth DB.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
The dict returned is of the form::
{'success': True or False,
'user_info': a user info dict,
'messages': list of str messages if any}
The user info dict will contain the following items::
{'user_id','system_id', 'full_name', 'email',
'is_active','created_on','user_role',
'last_login_try','last_login_success'}
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=None,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["Invalid user info request."],
}
if "email" not in payload:
LOGGER.error(
"[%s] Invalid user lookup request, missing %s."
% (payload["reqid"], "email")
)
return {
"success": False,
"user_info": None,
"failure_reason": "invalid request: missing 'email' in request",
"messages": ["email provided."],
}
email = payload["email"]
try:
users = meta.tables["users"]
s = (
select(*user_info_columns(users))
.order_by(asc(users.c.user_id))
.select_from(users)
.where(users.c.email == email)
)
with engine.begin() as conn:
result = conn.execute(s)
rows = result.first()
try:
serialized_result = dict(rows._mapping)
LOGGER.info(
"[%s] User lookup request succeeded. "
"email provided: %s."
% (
payload["reqid"],
pii_hash(payload["email"], payload["pii_salt"]),
)
)
return {
"success": True,
"user_info": serialized_result,
"messages": ["User look up successful."],
}
except Exception as e:
LOGGER.error(
"[%s] User lookup request failed. "
"email provided: %s. Exception: %r"
% (
payload["reqid"],
pii_hash(payload["email"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": "user email not found in DB",
"messages": ["User look up failed."],
}
except Exception as e:
LOGGER.error(
"[%s] User lookup request failed. "
"email provided: %s. Exception: %r"
% (
payload["reqid"],
pii_hash(payload["email"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": "exception when checking the DB",
"messages": ["User look up failed."],
}
[docs]def lookup_users(
payload: dict,
raiseonfail: bool = False,
override_authdb_path: str = None,
config: SimpleNamespace = None,
) -> dict:
"""This looks up users by a given property.
FIXME: add permissions checks to this instead of relying on a frontend to
filter out users who aren't allowed to perform this action.
Valid properties are all the columns in the users table, except for the
password column.
Parameters
----------
payload : dict
This is the input payload dict. Required items:
- by (str): the property column to use to look up the user by
- match (object): the required value of the property. Note that in most
cases, this will be coerced to a string to compare it to the database
value.
If by == 'extra_info', then match must be a dict of the form:
{'extra_info_key': extra_info_value}
to match one or more keys inside the extra_info JSON column to the
specified value.
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
raiseonfail : bool
If True, will raise an Exception if something goes wrong.
override_authdb_path : str or None
If given as a str, is the alternative path to the auth DB.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
The dict returned is of the form::
{'success': True or False,
'user_info': a user info dict,
'messages': list of str messages if any}
The user info dict will contain the following items::
{'user_id','system_id', 'full_name', 'email',
'is_active','created_on','user_role',
'last_login_try','last_login_success'}
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=None,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["Invalid user info request."],
}
for key in ("by", "match"):
if key not in payload:
LOGGER.error(
"[%s] Invalid user lookup request, missing %s."
% (payload["reqid"], key)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["Invalid match condition provided."],
}
lookup_by = payload["by"]
lookup_column = column(lookup_by)
lookup_match = payload["match"]
if (isinstance(lookup_match, dict) and lookup_by != "extra_info") or (
lookup_by == "extra_info" and not isinstance(lookup_match, dict)
):
LOGGER.error(
"[%s] Invalid user lookup request, "
"extra_info selector must provide a dict." % (payload["reqid"],)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: 'by' is 'extra_info' but "
"'match' is not a dict or "
"'match' is a dict and 'by' is not 'extra_info'"
),
"messages": ["Invalid match condition provided."],
}
try:
users = meta.tables["users"]
sel = (
select(*user_info_columns(users))
.order_by(asc(users.c.user_id))
.select_from(users)
)
if isinstance(lookup_match, dict) and lookup_by == "extra_info":
for key, val in lookup_match.items():
# FIXME: check if this is required in Postgres
# FIXME: this appears to be required in SQLite
if "sqlite:///" in dbpath:
sel = sel.where(
users.c.extra_info[key].as_string() == str(val)
)
else:
sel = sel.where(users.c.extra_info[key] == val)
else:
sel = sel.where(lookup_column == lookup_match)
with engine.begin() as conn:
result = conn.execute(sel)
rows = result.fetchall()
try:
serialized_result = [dict(x) for x in rows]
LOGGER.info(
"[%s] User lookup request succeeded." % payload["reqid"]
)
return {
"success": True,
"user_info": serialized_result,
"messages": ["User look up successful."],
}
except Exception as e:
LOGGER.error(
"[%s] User lookup request failed because of %s."
% (payload["reqid"], str(e))
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": "user not found in DB",
"messages": ["User look up failed."],
}
except Exception as e:
LOGGER.error(
"[%s] User lookup request failed because of %s."
% (payload["reqid"], str(e))
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": "exception in DB access",
"messages": ["User look up failed."],
}
###################
## EDITING USERS ##
###################
[docs]def edit_user(
payload: dict,
raiseonfail: bool = False,
override_permissions_json: str = None,
override_authdb_path: str = None,
config=None,
) -> dict:
"""This edits users.
FIXME: add permissions checks to this instead of relying on a frontend to
filter out users who aren't allowed to perform this action.
Parameters
----------
payload : dict
This is the input payload dict. Required items:
- user_id: int, user ID of an admin user or == target_userid
- user_role: str, == 'superuser' or == target_userid user_role
- session_token: str, session token of admin or target_userid token
- target_userid: int, the user to edit
- update_dict: dict, the update dict
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
Only these items can be edited::
{'full_name', 'email', <- by user and superuser
'is_active','user_role', 'email_verified'} <- by superuser only
User IDs 2 and 3 are reserved for the system-wide anonymous and locked
users respectively, and can't be edited.
raiseonfail : bool
If True, will raise an Exception if something goes wrong.
override_permissions_json : str or None
If given as a str, is the alternative path to the permissions JSON to
load and use for this request. Normally, the path to the permissions
JSON has already been specified as a process-local variable by the main
authnzerver start up routines. If you want to use some other permissions
model JSON (e.g. for testing), provide that here.
Note that we load the permissions JSON from disk every time we need to
take a decision. This might be a bit slower, but allows for much faster
policy changes by just changing the permissions JSON file and not having
to restart the authnzerver.
override_authdb_path : str or None
If given as a str, is the alternative path to the auth DB.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
The dict returned is of the form::
{'success': True or False,
'user_info': dict, with new user info,
'messages': list of str messages if any}
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=override_permissions_json,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["Invalid user edit request."],
}
for key in (
"user_id",
"user_role",
"session_token",
"target_userid",
"update_dict",
):
if key not in payload:
LOGGER.error(
"[%s] Invalid user edit request, missing %s."
% (payload["reqid"], key)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["No %s provided." % key],
}
user_id = payload["user_id"]
user_role = payload["user_role"]
session_token = payload["session_token"]
target_userid = payload["target_userid"]
update_dict = payload["update_dict"]
if not isinstance(update_dict, dict):
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: '%s', session_token: %s, target_userid: %s. "
"An update dict was not provided."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "invalid request: no update dict was provided",
"messages": ["No update_dict provided."],
}
if target_userid in (2, 3):
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: '%s', session_token: %s, target_userid: %s. "
"Editing systemwide anonymous or locked accounts is not allowed."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: can't edit anonymous/locked users"
),
"messages": [
"Editing anonymous/locked user accounts not allowed."
],
}
try:
# the case where the user updates their own info
if target_userid == user_id and user_role in (
"authenticated",
"staff",
):
# check if the user_id == target_userid
# if so, check if session_token is valid and belongs to user_id
session_info = auth_session_exists(
{
"session_token": session_token,
"pii_salt": payload["pii_salt"],
"reqid": payload["reqid"],
},
raiseonfail=raiseonfail,
override_authdb_path=override_authdb_path,
)
# check if the session info user_id matches the provided user_id and
# role
if (
session_info
and session_info["success"]
and session_info["session_info"]["is_active"] is True
and session_info["session_info"]["user_id"] == user_id
and session_info["session_info"]["user_role"] == user_role
):
editeable_elements = {"full_name", "email"}
update_check = set(update_dict.keys()) - editeable_elements
# check if the update keys are valid
if len(update_check) > 0:
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: '%s', "
"session_token: %s, target_userid: %s. "
"User updating their own info can only"
"do so for full_name, email."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(
payload["target_userid"], payload["pii_salt"]
),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"role '%s' can't edit anything other "
"than 'full_name', 'email'" % payload["user_role"]
),
"messages": [
"extra elements in " "update_dict not allowed"
],
}
else:
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s. "
"Valid session not found for the originating user_id."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(
payload["target_userid"], payload["pii_salt"]
),
)
)
return {
"success": False,
"failure_reason": (
"no valid session found for originating user_id"
),
"user_info": None,
"messages": [
"User session info not available "
"for this user edit attempt."
],
}
# the case where the superuser updates a user's info (or their own info)
elif user_role == "superuser":
# check if the user_id == target_userid
# if so, check if session_token is valid and belongs to user_id
session_info = auth_session_exists(
{
"session_token": session_token,
"pii_salt": payload["pii_salt"],
"reqid": payload["reqid"],
},
raiseonfail=raiseonfail,
override_authdb_path=override_authdb_path,
)
# check if the session info user_id matches the provided user_id and
# role
if (
session_info
and session_info["success"]
and session_info["session_info"]["is_active"] is True
and session_info["session_info"]["user_id"] == user_id
and session_info["session_info"]["user_role"] == user_role
):
editeable_elements = {
"full_name",
"email",
"is_active",
"user_role",
"email_verified",
}
update_check = set(update_dict.keys()) - editeable_elements
# check if the update keys are valid
if len(update_check) > 0:
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s. "
"Extra non-editable elements found in update_dict."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(
payload["target_userid"], payload["pii_salt"]
),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"role: '%s' is not allowed to edit items: '%s'"
% (payload["user_role"], list(update_dict.keys()))
),
"messages": [
"extra elements in " "update_dict not allowed"
],
}
# check if the roles provided are valid
permissions_model = load_permissions_json(permjson)
if "user_role" in update_dict and (
update_dict["user_role"] not in permissions_model["roles"]
):
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s. "
"Invalid role change in update_dict "
"to an non-existent role."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(
payload["target_userid"], payload["pii_salt"]
),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"role change requested is not valid"
),
"messages": [
"unknown role change " "request in update_dict"
],
}
else:
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s. "
"Session token provided is invalid "
"for a superuser account."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(
payload["target_userid"], payload["pii_salt"]
),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "invalid session for edit attempt",
"messages": [
"Superuser session info not available "
"for this user edit attempt."
],
}
# any other case is a failure
else:
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s. "
"Session token provided is invalid."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "invalid session for edit attempt",
"messages": [
"user_id or session info not available "
"for this user edit attempt."
],
}
#
# all update checks passed, do the update
#
users = meta.tables["users"]
# execute the update
# NOTE: here, we don't filter on is_active to allow unlocking of users
upd = (
users.update()
.where(users.c.user_id == target_userid)
.values(update_dict)
)
with engine.begin() as conn:
conn.execute(upd)
sel = (
select(*user_info_columns(users))
.select_from(users)
.where(users.c.user_id == target_userid)
)
result = conn.execute(sel)
rows = result.first()
try:
serialized_result = dict(rows._mapping)
LOGGER.info(
"[%s] User edit request succeeded for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": True,
"user_info": serialized_result,
"messages": ["User update successful."],
}
except Exception as e:
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s. Exception: %r"
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": "exception when trying to update user",
"messages": ["User update failed."],
}
except Exception as e:
LOGGER.error(
"[%s] User edit request failed for "
"user_id: %s, role: %s, "
"session_token: %s, target_userid: %s. Exception: %r"
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
payload["user_role"],
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": "exception when trying to update user",
"messages": ["User update failed."],
}
[docs]def internal_edit_user(
payload: dict,
raiseonfail: bool = False,
override_authdb_path: str = None,
config: SimpleNamespace = None,
) -> dict:
"""Handles editing users. Meant for use internally in a frontend server.
Parameters
----------
payload : dict
The input payload dict. Required items:
- target_userid: int, the user to edit
- update_dict: dict, the changes to make, with each key being a column
value to change in the *users* table.
*update_dict* cannot contain the following fields: user_id, system_id,
password, emailverify_sent_datetime, emailforgotpass_sent_datetime,
emailchangepass_sent_datetime, last_login_success, last_login_try,
failed_login_tries, created_on, and last_updated. These are tracked in
other action functions and should not be changed directly.
If *update_dict* contains the *extra_info* field, this JSON field in the
database will be updated with the info in *extra_info*. To delete an
item from *extra_info*, pass in the special value of "__delete__" in
*extra_info* for that item.
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
raiseonfail : bool
If True, and something goes wrong, this will raise an Exception instead
of returning normally with a failure condition.
override_authdb_path : str or None
The SQLAlchemy database URL to use if not using the default auth DB.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
Returns a dict containing the new user information.
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=None,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"success": False,
"session_token": None,
"expires": None,
"messages": ["Invalid edit-user request."],
}
for key in ("target_userid", "update_dict"):
if key not in payload:
LOGGER.error(
"[%s] Invalid session edit-user request, missing %s."
% (payload["reqid"], key)
)
return {
"success": False,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": [
"Invalid edit-user request: "
"missing or invalid parameters."
],
}
target_userid = payload["target_userid"]
update_dict = payload["update_dict"]
if update_dict is None or len(update_dict) == 0:
return {
"success": False,
"failure_reason": (
"invalid request: missing 'update_dict' in request"
),
"messages": [
"Invalid user-edit request: " "missing or invalid parameters."
],
}
update_dict_keys = set(update_dict.keys())
disallowed_keys = {
"user_id",
"system_id",
"password",
"emailverify_sent_datetime",
"emailforgotpass_sent_datetime",
"emailchangepass_sent_datetime",
"last_login_success",
"last_login_try",
"failed_login_tries",
"created_on",
"last_updated",
}
leftover_keys = update_dict_keys.intersection(disallowed_keys)
if len(leftover_keys) > 0:
LOGGER.error(
"[%s] Invalid edit-user request, "
"found disallowed update keys in update_dict: %s."
% (payload["reqid"], leftover_keys)
)
return {
"success": False,
"failure_reason": (
"invalid request: disallowed keys in update_dict: %s"
% leftover_keys
),
"messages": [
"Invalid edit-user request: " "invalid update parameters."
],
}
#
# now, try to update
#
try:
users = meta.tables["users"]
sel = (
select(users.c.user_id, users.c.extra_info)
.select_from(users)
.where(users.c.user_id == target_userid)
)
with engine.begin() as conn:
result = conn.execute(sel)
userid_and_extrainfo = result.first()
if not userid_and_extrainfo or len(userid_and_extrainfo) == 0:
return {
"success": False,
"failure_reason": "no such user",
"messages": ["User info update failed."],
}
if (
"extra_info" in update_dict
and update_dict["extra_info"] is not None
):
user_extra_info = userid_and_extrainfo.extra_info
if not user_extra_info:
user_extra_info = {}
for key, val in update_dict["extra_info"].items():
if val == "__delete__" and key in user_extra_info:
del user_extra_info[key]
else:
user_extra_info[key] = val
else:
user_extra_info = userid_and_extrainfo.extra_info
# do the update
# replace the extra_info key in the update_dict since we update that
# separately
update_dict["extra_info"] = user_extra_info
with engine.begin() as conn:
upd = (
users.update()
.where(
users.c.user_id == target_userid,
)
.values(update_dict)
)
conn.execute(upd)
s = (
select(*user_info_columns(users))
.select_from(users)
.where(users.c.user_id == target_userid)
)
result = conn.execute(s)
row = result.first()
try:
serialized_result = dict(row._mapping)
LOGGER.info(
"[%s] User info updated for "
"user_id: %s."
% (
payload["reqid"],
pii_hash(
serialized_result["user_id"], payload["pii_salt"]
),
)
)
return {
"success": True,
"user_info": serialized_result,
"messages": ["User-info update successful."],
}
except Exception as e:
LOGGER.error(
"[%s] User info update failed for session token: %s. "
"Exception was: %r."
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
e,
)
)
return {
"success": False,
"failure_reason": "user requested for update doesn't exist",
"messages": ["User info update failed."],
}
except Exception as e:
LOGGER.error(
"[%s] User info update failed for user_id: %s. "
"Exception was: %r."
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
e,
)
)
return {
"success": False,
"failure_reason": "DB error when updating user info",
"messages": ["User info update failed."],
}
[docs]def internal_toggle_user_lock(
payload: dict,
raiseonfail: bool = False,
override_authdb_path: str = None,
config: SimpleNamespace = None,
) -> dict:
"""Locks/unlocks user accounts.
This version of the function should only be run internally (i.e. not called
by a client). The use-case is automatically locking user accounts if there
are too many incorrect password attempts. The lock can be permanent or
temporary.
Parameters
----------
payload : dict
This is the input payload dict. Required items:
- target_userid: int, the user to lock/unlock
- action: str {'unlock','lock'}
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
raiseonfail : bool
If True, will raise an Exception if something goes wrong.
override_authdb_path : str or None
If given as a str, is the alternative path to the auth DB.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
The dict returned is of the form::
{'success': True or False,
'user_info': dict, with new user info,
'messages': list of str messages if any}
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=None,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["Invalid user lock toggle request."],
}
from .session import auth_delete_sessions_userid
for key in ("target_userid", "action"):
if key not in payload:
LOGGER.error(
"[%s] Invalid user lock toggle request, missing %s."
% (payload["reqid"], key)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["No %s provided for toggle_user_lock" % key],
}
target_userid = payload["target_userid"]
action = payload["action"]
if action not in ("unlock", "lock"):
LOGGER.error(
"[%s] Invalid user lock toggle request for user_id: %s. "
"Unknown action requested: %s"
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
action,
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "action must be either 'lock' or 'unlock'",
"messages": ["Unknown action requested for toggle_user_lock."],
}
if target_userid in (2, 3):
LOGGER.error(
"[%s] Invalid user lock toggle request for user_id: %s. "
"Systemwide anonymous/locked users cannot be edited."
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "can't edit anonymous/locked users",
"messages": [
"Editing anonymous/locked user accounts not allowed."
],
}
try:
#
# all update checks, passed, do the update
#
users = meta.tables["users"]
#
# get the current user_role of the user and save it to
# extra_info["previous_user_roles"] in a list
#
sel = (
select(*user_info_columns(users))
.select_from(users)
.where(users.c.user_id == target_userid)
)
with engine.begin() as conn:
result = conn.execute(sel)
row = result.first()
current_user_role = row.user_role
user_extra_info = row.extra_info
previous_user_roles = user_extra_info.get(
"previous_user_roles", ["locked"]
)
# if the current user role is already locked and we get a request to
# lock the user again, don't do anything
if current_user_role == "locked" and payload["action"] == "lock":
LOGGER.warning(
"[%s] User lock toggle requested to set state to "
"locked for user_id: %s but user is "
"already locked. Ignoring... "
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": True,
"user_info": dict(row._mapping),
"messages": ["User lock toggle successful."],
}
# if the current user role is already unlocked and we get a request to
# unlock the user again, don't do anything
if current_user_role != "locked" and payload["action"] == "unlock":
LOGGER.warning(
"[%s] User lock toggle requested to set state to "
"unlocked for user_id: %s but user is "
"already unlocked with user_role: %s. Ignoring... "
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
current_user_role,
)
)
return {
"success": True,
"user_info": dict(row._mapping),
"messages": ["User lock toggle successful."],
}
# when we lock, save the current user role to
# extra_info["previous_user_roles"]
if payload["action"] == "lock":
previous_user_roles.append(current_user_role)
user_extra_info["previous_user_roles"] = previous_user_roles
LOGGER.info(
"[%s] "
"User with user_id: %s is being locked. "
"Their current user_role is: %s."
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
previous_user_roles[-1],
)
)
update_dict = {
"is_active": False,
"user_role": "locked",
"extra_info": user_extra_info,
}
# when we unlock, get back the very last previous user role
elif payload["action"] == "unlock":
LOGGER.info(
"[%s] "
"User with user_id: %s is being unlocked. "
"Their previous user_role was: %s."
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
previous_user_roles[-1],
)
)
update_dict = {
"is_active": True,
"user_role": previous_user_roles[-1],
}
else:
LOGGER.error(
"[%s] Invalid user lock toggle request for user_id: %s. "
"Invalid toggle action requested: '%s'."
% (
payload["reqid"],
payload["action"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "invalid lock toggle requested",
"messages": ["Invalid lock toggle action requested."],
}
# execute the update
with engine.begin() as conn:
upd = (
users.update()
.where(users.c.user_id == target_userid)
.values(update_dict)
)
conn.execute(upd)
# check the update and return new values
result = conn.execute(sel)
rows = result.first()
# delete all the sessions belonging to this user if the action to
# perform is 'lock'
if payload["action"] == "lock":
auth_delete_sessions_userid(
{
"user_id": target_userid,
"session_token": None,
"keep_current_session": False,
"pii_salt": payload["pii_salt"],
"reqid": payload["reqid"],
},
raiseonfail=raiseonfail,
override_authdb_path=override_authdb_path,
)
try:
serialized_result = dict(rows._mapping)
LOGGER.info(
"[%s] User lock toggle request succeeded for user_id: %s. "
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": True,
"user_info": serialized_result,
"messages": ["User lock toggle successful."],
}
except Exception as e:
LOGGER.error(
"[%s] User lock toggle request failed for user_id: %s. "
"Exception was: %r"
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": (
"exception encountered when trying lock toggle action"
),
"messages": ["User lock toggle failed."],
}
except Exception as e:
LOGGER.error(
"[%s] User lock toggle request failed for user_id: %s. "
"Exception was: %r"
% (
payload["reqid"],
pii_hash(payload["target_userid"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": (
"exception encountered when trying lock toggle action"
),
"messages": ["User lock toggle failed."],
}
[docs]def toggle_user_lock(
payload: dict,
raiseonfail: bool = False,
override_authdb_path: str = None,
config: SimpleNamespace = None,
) -> dict:
"""Locks/unlocks user accounts.
Can only be run by superusers and is suitable for use when called from a
frontend.
Parameters
----------
payload : dict
This is the input payload dict. Required items:
- user_id: int, user ID of a superuser
- user_role: str, == 'superuser'
- session_token: str, session token of superuser
- target_userid: int, the user to lock/unlock
- action: str {'unlock','lock'}
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
raiseonfail : bool
If True, will raise an Exception if something goes wrong.
override_authdb_path : str or None
If given as a str, is the alternative path to the auth DB.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
The dict returned is of the form::
{'success': True or False,
'user_info': dict, with new user info,
'messages': list of str messages if any}
"""
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["Invalid user lock toggle request."],
}
for key in (
"user_id",
"user_role",
"session_token",
"target_userid",
"action",
):
if key not in payload:
LOGGER.error(
"[%s] Invalid user lock toggle request, missing %s."
% (payload["reqid"], key)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"messages": ["No %s provided for toggle_user_lock" % key],
}
user_id = payload["user_id"]
user_role = payload["user_role"]
session_token = payload["session_token"]
target_userid = payload["target_userid"]
action = payload["action"]
# only superusers can toggle locks
if user_role != "superuser":
LOGGER.error(
"[%s] Invalid user lock toggle request "
"by user_id: %s with role: %s, "
"session_token: %s, target user_id: %s "
"User does not have a superuser role."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
pii_hash(payload["user_role"], payload["pii_salt"]),
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"user role is not 'superuser', required to toggle locks"
),
"messages": ["You don't have lock/unlock privileges."],
}
# don't lock the calling user out
if target_userid == user_id:
LOGGER.error(
"[%s] Invalid user lock toggle request "
"by user_id: %s with role: %s, "
"session_token: %s, target user_id: %s "
"User attempted to toggle lock on their own account."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
pii_hash(payload["user_role"], payload["pii_salt"]),
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "can't toggle a lock state on self",
"messages": ["You can't lock/unlock your own user account."],
}
# unknown action attempted
if action not in ("unlock", "lock"):
LOGGER.error(
"[%s] Invalid user lock toggle request "
"by user_id: %s with role: %s, "
"session_token: %s, target user_id: %s "
"Unknown action requested: %s"
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
pii_hash(payload["user_role"], payload["pii_salt"]),
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
action,
)
)
return {
"success": False,
"user_info": None,
"failure_reason": "action must be one of 'lock', 'unlock'",
"messages": ["Unknown action requested for toggle_user_lock."],
}
# attempt to edit systemwide accounts
if target_userid in (2, 3):
LOGGER.error(
"[%s] Invalid user lock toggle request "
"by user_id: %s with role: %s, "
"session_token: %s, target user_id: %s "
"Systemwide anonymous/locked accounts can't be edited."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
pii_hash(payload["user_role"], payload["pii_salt"]),
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"can't toggle lock state for system anonymous/locked accounts"
),
"messages": [
"Editing anonymous/locked user accounts not allowed."
],
}
#
# finally, process the attempt
#
try:
# check if session_token is valid and belongs to user_id
session_info = auth_session_exists(
{
"session_token": session_token,
"pii_salt": payload["pii_salt"],
"reqid": payload["reqid"],
},
raiseonfail=raiseonfail,
override_authdb_path=override_authdb_path,
)
# check if the session info user_id matches the provided user_id and
# role
if not (
session_info
and session_info["success"]
and session_info["session_info"]["is_active"] is True
and session_info["session_info"]["user_id"] == user_id
and session_info["session_info"]["user_role"] == user_role
):
LOGGER.error(
"[%s] Invalid user lock toggle request "
"by user_id: %s with role: %s, "
"session_token: %s, target user_id: %s "
"Session token does not match the expected user ID or role."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
pii_hash(payload["user_role"], payload["pii_salt"]),
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
)
)
return {
"success": False,
"user_info": None,
"failure_reason": (
"invalid session for user attempting lock toggle"
),
"messages": [
"Superuser session info not available "
"for this user edit attempt."
],
}
#
# all update checks passed, do the update
#
res = internal_toggle_user_lock(
payload,
raiseonfail=raiseonfail,
override_authdb_path=override_authdb_path,
)
return res
except Exception as e:
LOGGER.error(
"[%s] Invalid user lock toggle request "
"by user_id: %s with role: %s, "
"session_token: %s, target user_id: %s "
"Exception was: %r."
% (
payload["reqid"],
pii_hash(payload["user_id"], payload["pii_salt"]),
pii_hash(payload["user_role"], payload["pii_salt"]),
pii_hash(payload["session_token"], payload["pii_salt"]),
pii_hash(payload["target_userid"], payload["pii_salt"]),
e,
)
)
if raiseonfail:
raise
return {
"success": False,
"user_info": None,
"failure_reason": "exception when trying to toggle lock state",
"messages": ["User lock toggle failed."],
}