# -*- coding: utf-8 -*-
# actions_session.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Aug 2018
# License: MIT - see the LICENSE file for the full text.
"""This contains functions to drive session-related auth actions.
"""
#############
## LOGGING ##
#############
import logging
from types import SimpleNamespace
# get a logger
LOGGER = logging.getLogger(__name__)
#############
## IMPORTS ##
#############
try:
from datetime import timezone, timedelta
utc = timezone.utc
except Exception:
from datetime import timedelta, tzinfo
ZERO = timedelta(0)
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
utc = UTC()
from sqlalchemy import select
from argon2 import PasswordHasher
from ..permissions import pii_hash
from authnzerver.actions.utils import get_procdb_permjson
from .session import auth_session_exists
############################
## PASSWORD HASHER OBJECT ##
############################
pass_hasher = PasswordHasher()
############################################
## USER PASSWORD CHECK HANDLING FUNCTIONS ##
############################################
[docs]def auth_password_check(
payload: dict,
override_authdb_path: str = None,
raiseonfail: bool = False,
config: SimpleNamespace = None,
) -> dict:
"""This runs a password check given a session token and password.
Used to gate high-security areas or operations that require re-verification
of the password for a user's existing session.
Parameters
----------
payload : dict
This is a dict containing the following items:
- session_token
- password
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
override_authdb_path : str or None
The SQLAlchemy database URL to use if not using the default auth DB.
raiseonfail : bool
If True, and something goes wrong, this will raise an Exception instead
of returning normally with a failure condition.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
Returns a dict containing the result of the password verification
check.
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=None,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"user_id": None,
"messages": ["Invalid password check request."],
}
# check broken request
request_ok = True
for item in ("password", "session_token"):
if item not in payload:
request_ok = False
break
# this checks if the database connection is live
users = meta.tables["users"]
#
# check if the request is OK
#
# if it isn't, then hash the dummy user's password twice
if not request_ok:
# dummy session request
auth_session_exists(
{
"session_token": "nope",
"reqid": payload["reqid"],
"pii_salt": payload["pii_salt"],
},
raiseonfail=raiseonfail,
override_authdb_path=override_authdb_path,
)
# get the dummy user's password from the DB on an outright failure -
# run this twice to match the number of verifications for a normal
# successful user
dummy_sel = (
select(users.c.password)
.select_from(users)
.where(users.c.user_id == 3)
)
with engine.begin() as conn:
for _ in range(2):
dummy_results = conn.execute(dummy_sel)
dummy_password = dummy_results.scalar()
try:
pass_hasher.verify(dummy_password, "nope")
except Exception:
pass
LOGGER.error(
"[%s] Password check failed for session_token: %s. "
"Missing request items."
% (
payload["reqid"],
pii_hash(payload["session_token"], payload["pii_salt"]),
)
)
return {
"success": False,
"failure_reason": (
"invalid request: missing either 'password' or 'session_token'"
),
"user_id": None,
"messages": ["Invalid password verification request."],
}
# otherwise, now we'll check if the session exists
else:
session_info = auth_session_exists(
{
"session_token": payload["session_token"],
"reqid": payload["reqid"],
"pii_salt": payload["pii_salt"],
},
raiseonfail=raiseonfail,
override_authdb_path=override_authdb_path,
)
# if it doesn't, hash the dummy password twice
if not session_info["success"]:
# get the dummy user's password from the DB on an outright failure
# run this twice to match the number of verifications for a normal
# successful user
dummy_sel = (
select(users.c.password)
.select_from(users)
.where(users.c.user_id == 3)
)
with engine.begin() as conn:
for _ in range(2):
dummy_results = conn.execute(dummy_sel)
dummy_password = dummy_results.scalar()
try:
pass_hasher.verify(dummy_password, "nope")
except Exception:
pass
LOGGER.error(
"[%s] Password check failed for session_token: %s. "
"The session token provided does not exist."
% (
payload["reqid"],
pii_hash(payload["session_token"], payload["pii_salt"]),
)
)
return {
"success": False,
"failure_reason": "session does not exist",
"user_id": None,
"messages": ["No session token provided."],
}
# if the session token does exist, we'll proceed to checking the
# password for the provided email
else:
with engine.begin() as conn:
# always get the dummy user's password from the DB
dummy_sel = (
select(users.c.password)
.select_from(users)
.where(users.c.user_id == 3)
)
dummy_results = conn.execute(dummy_sel)
dummy_password = dummy_results.scalar()
try:
pass_hasher.verify(dummy_password, "nope")
except Exception:
pass
# look up the actual provided user
user_sel = (
select(
users.c.user_id,
users.c.password,
users.c.is_active,
users.c.user_role,
)
.select_from(users)
.where(
users.c.user_id
== session_info["session_info"]["user_id"]
)
)
user_results = conn.execute(user_sel)
user_info = user_results.first()
if user_info:
try:
pass_ok = pass_hasher.verify(
user_info.password,
payload["password"][:256],
)
except Exception as e:
LOGGER.error(
"[%s] Password check failed for session_token: %s. "
"The password provided does not match the one on "
"record for user_id: %s. Exception was: %r"
% (
payload["reqid"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(user_info.user_id, payload["pii_salt"]),
e,
)
)
pass_ok = False
else:
try:
pass_hasher.verify(dummy_password, "nope")
except Exception:
pass
pass_ok = False
if not pass_ok:
return {
"success": False,
"failure_reason": (
"user does not exist or password doesn't match"
),
"user_id": None,
"messages": [
"Sorry, that user ID and "
"password combination didn't work."
],
}
# if password verification succeeeded, check if the user can
# actually log in (i.e. their account is not locked or is not
# inactive)
else:
# if the user account is active and unlocked, proceed.
# the frontend will take this user_id and ask for a new session
# token with it.
if user_info.is_active and user_info.user_role != "locked":
LOGGER.info(
"[%s] Password check successful for "
"session_token: %s. "
"Matched user with user_id: %s. "
% (
payload["reqid"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(user_info.user_id, payload["pii_salt"]),
)
)
return {
"success": True,
"user_id": user_info.user_id,
"user_role": user_info.user_role,
"messages": ["Verification successful."],
}
# if the user account is locked, return a failure
else:
LOGGER.error(
"[%s] Password check failed for session_token: %s. "
"Matched user with user_id: %s is not active "
"or is locked."
% (
payload["reqid"],
pii_hash(
payload["session_token"], payload["pii_salt"]
),
pii_hash(user_info.user_id, payload["pii_salt"]),
)
)
return {
"success": False,
"failure_reason": "user exists but is inactive",
"user_id": user_info.user_id,
"messages": [
"Sorry, that user ID and "
"password combination didn't work."
],
}
[docs]def auth_password_check_nosession(
payload: dict,
override_authdb_path: str = None,
raiseonfail: bool = False,
config: SimpleNamespace = None,
) -> dict:
"""This runs a password check given an email address and password.
Used to gate high-security areas or operations that require re-verification
of the password for a user, without checking if they have a session.
Useful for APIs, where the 'password' is some API token.
Parameters
----------
payload : dict
This is a dict containing the following items:
- email
- password
In addition to these items received from an authnzerver client, the
payload must also include the following keys (usually added in by a
wrapping function):
- reqid: int or str
- pii_salt: str
override_authdb_path : str or None
The SQLAlchemy database URL to use if not using the default auth DB.
raiseonfail : bool
If True, and something goes wrong, this will raise an Exception instead
of returning normally with a failure condition.
config : SimpleNamespace object or None
An object containing systemwide config variables as attributes. This is
useful when the wrapping function needs to pass in some settings
directly from environment variables.
Returns
-------
dict
Returns a dict containing the result of the password verification
check.
"""
engine, meta, permjson, dbpath = get_procdb_permjson(
override_authdb_path=override_authdb_path,
override_permissions_json=None,
raiseonfail=raiseonfail,
)
for key in ("reqid", "pii_salt"):
if key not in payload:
LOGGER.error(
"Missing %s in payload dict. Can't process this request." % key
)
return {
"success": False,
"failure_reason": (
"invalid request: missing '%s' in request" % key
),
"user_id": None,
"messages": ["Invalid password check request."],
}
# check broken request
request_ok = True
for item in ("password", "email"):
if item not in payload:
request_ok = False
break
users = meta.tables["users"]
#
# check if the request is OK
#
# if it isn't, then hash the dummy user's password twice
if not request_ok:
# get the dummy user's password from the DB on an outright failure -
# run this twice to match the number of verifications for a normal
# successful user
dummy_sel = (
select(users.c.password)
.select_from(users)
.where(users.c.user_id == 3)
)
with engine.begin() as conn:
for _ in range(2):
dummy_results = conn.execute(dummy_sel)
dummy_password = dummy_results.scalar()
try:
pass_hasher.verify(dummy_password, "nope")
except Exception:
pass
LOGGER.error(
"[%s] Password check failed for email: %s. "
"Missing request items."
% (
payload["reqid"],
pii_hash(payload["email"], payload["pii_salt"]),
)
)
return {
"success": False,
"failure_reason": (
"invalid request: missing 'email' or 'password' in request"
),
"user_id": None,
"messages": ["Invalid password verification request."],
}
# otherwise, now we'll check if the user exists and the password is correct
else:
with engine.begin() as conn:
# always get the dummy user's password from the DB
dummy_sel = (
select(users.c.password)
.select_from(users)
.where(users.c.user_id == 3)
)
dummy_results = conn.execute(dummy_sel)
dummy_password = dummy_results.scalar()
try:
pass_hasher.verify(dummy_password, "nope")
except Exception:
pass
# look up the actual provided user
user_sel = (
select(
users.c.user_id,
users.c.password,
users.c.is_active,
users.c.user_role,
)
.select_from(users)
.where(users.c.email == payload["email"])
)
user_results = conn.execute(user_sel)
user_info = user_results.first()
pass_ok = False
if user_info:
try:
pass_ok = pass_hasher.verify(
user_info.password,
payload["password"][:256],
)
except Exception as e:
LOGGER.error(
"[%s] Password check failed for email: %s. "
"The password provided does not match the one on "
"record for user_id: %s. Exception was: %r"
% (
payload["reqid"],
pii_hash(payload["email"], payload["pii_salt"]),
pii_hash(user_info.user_id, payload["pii_salt"]),
e,
)
)
pass_ok = False
# if the user doesn't exist, do a dummy pass hash
else:
try:
pass_hasher.verify(dummy_password, "nope")
except Exception:
pass
pass_ok = False
if not pass_ok:
return {
"success": False,
"failure_reason": (
"user does not exist or password doesn't match"
),
"user_id": None,
"messages": [
"Sorry, that user ID and "
"password combination didn't work."
],
}
# if password verification succeeeded, check if the user can
# actually log in (i.e. their account is not locked or is not
# inactive)
else:
# if the user account is active and unlocked, proceed.
# the frontend will take this user_id and ask for a new session
# token with it.
if user_info.is_active and user_info.user_role != "locked":
LOGGER.info(
"[%s] Password check successful for email: %s. "
"Matched user with user_id: %s. "
% (
payload["reqid"],
pii_hash(payload["email"], payload["pii_salt"]),
pii_hash(user_info.user_id, payload["pii_salt"]),
)
)
return {
"success": True,
"user_id": user_info.user_id,
"user_role": user_info.user_role,
"messages": ["Verification successful."],
}
# if the user account is locked, return a failure
else:
LOGGER.error(
"[%s] Password check failed for email: %s. "
"Matched user with user_id: %s is not active "
"or is locked."
% (
payload["reqid"],
pii_hash(payload["email"], payload["pii_salt"]),
pii_hash(user_info.user_id, payload["pii_salt"]),
)
)
return {
"success": False,
"failure_reason": "user exists but is inactive",
"user_id": user_info.user_id,
"messages": [
"Sorry, that user ID and "
"password combination didn't work."
],
}