Source code for authnzerver.actions.apikey

# -*- coding: utf-8 -*-
# actions_apikey.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Aug 2018
# License: MIT - see the LICENSE file for the full text.

"""This contains functions to drive API key related auth actions.

"""

#############
## LOGGING ##
#############

import logging
import json
from types import SimpleNamespace

# get a logger
LOGGER = logging.getLogger(__name__)


#############
## IMPORTS ##
#############

try:

    from datetime import datetime, timezone, timedelta

    utc = timezone.utc

except Exception:

    from datetime import datetime, timedelta, tzinfo

    ZERO = timedelta(0)

    class UTC(tzinfo):
        """UTC"""

        def utcoffset(self, dt):
            return ZERO

        def tzname(self, dt):
            return "UTC"

        def dst(self, dt):
            return ZERO

    utc = UTC()

import secrets

from sqlalchemy import select, insert

from .session import auth_session_exists
from ..permissions import pii_hash
from .access import check_user_access
from authnzerver.actions.utils import get_procdb_permjson


######################
## API KEY HANDLING ##
######################


[docs]def issue_apikey( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ) -> dict: """Issues a new API key. Parameters ---------- payload : dict The payload dict must have the following keys: - issuer: str, the entity that will be designated as the API key issuer - audience: str, the service this API key is being issued for - subject: str, the specific API endpoint API key is being issued for - apiversion: int or str, the API version that the API key is valid for - expires_days: int, the number of days after which the API key will expire - not_valid_before: float or int, the amount of seconds after utcnow() when the API key becomes valid - user_id: int, the user ID of the user requesting the API key - user_role: str, the user role of the user requesting the API key - ip_address: str, the IP address to tie the API key to - user_agent: str, the browser user agent requesting the API key - session_token: str, the session token of the user requesting the API key raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually request an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True or False, 'apikey': apikey dict, 'expires': expiry datetime in ISO format, 'messages': list of str messages if any} Notes ----- API keys are tied to an IP address and client header combination. This function will return a dict with all the API key information. This entire dict should be serialized to JSON, encrypted and time-stamp signed by the frontend as the final "API key", and finally sent back to the client. """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=override_permissions_json, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "apikey": None, "expires": None, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Invalid API key request."], } for key in { "user_id", "user_role", "expires_days", "not_valid_before", "issuer", "audience", "subject", "ip_address", "user_agent", "session_token", "apiversion", }: if key not in payload: LOGGER.error( "[%s] Invalid API key request, missing %s." % (payload["reqid"], key) ) return { "success": False, "apikey": None, "expires": None, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Some required keys are missing from payload."], } # check if the provided user_id and role can actually create an API key user_id = payload["user_id"] user_role = payload["user_role"] apikey_creation_allowed = check_user_access( { "user_id": user_id, "user_role": user_role, "action": "create", "target_name": "apikey", "target_owner": user_id, "target_visibility": "private", "target_sharedwith": None, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_creation_allowed["success"]: LOGGER.error( "[%s] Invalid API key issuance request. " "from user_id: %s, role: '%s'. " "The user is not allowed to create an API key." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": "user not allowed to issue API key", "messages": [ "API key issuance failed. " "You are not allowed to issue an API key." ], } # check the session session_info = auth_session_exists( { "session_token": payload["session_token"], "pii_salt": payload["pii_salt"], "reqid": payload["reqid"], }, raiseonfail=raiseonfail, override_authdb_path=override_authdb_path, ) if not session_info["success"]: LOGGER.error( "[%s] Invalid API key request. " "user_id: %s, session_token: %s, role: '%s', " "ip_address: %s, user_agent: %s requested an API key for " "audience: '%s', subject: '%s', apiversion: %s. " "Session token of requestor was not found in the DB." % ( payload["reqid"], pii_hash(payload["user_id"], payload["pii_salt"]), pii_hash(payload["session_token"], payload["pii_salt"]), payload["user_role"], pii_hash(payload["ip_address"], payload["pii_salt"]), pii_hash(payload["user_agent"], payload["pii_salt"]), payload["audience"], payload["subject"], payload["apiversion"], ) ) return { "success": False, "apikey": None, "expires": None, "failure_reason": ( "invalid session for user requesting API key issuance" ), "messages": ( ["Invalid session token for API key issuance request."] ), } session = session_info["session_info"] # check if the session info matches what we have in the payload session_ok = ( (session["user_id"] == payload["user_id"]) and (session["ip_address"] == payload["ip_address"]) and (session["user_agent"] == payload["user_agent"]) and (session["user_role"] == payload["user_role"]) ) if not session_ok: LOGGER.error( "[%s] Invalid API key request. " "user_id: %s, session_token: %s, role: '%s', " "ip_address: %s, user_agent: %s requested an API key for " "audience: '%s', subject: '%s', apiversion: '%s'. " "Session token info of requestor does not match payload info." % ( payload["reqid"], pii_hash(payload["user_id"], payload["pii_salt"]), pii_hash(payload["session_token"], payload["pii_salt"]), payload["user_role"], pii_hash(payload["ip_address"], payload["pii_salt"]), pii_hash(payload["user_agent"], payload["pii_salt"]), payload["audience"], payload["subject"], payload["apiversion"], ) ) return { "success": False, "apikey": None, "expires": None, "failure_reason": ( "invalid session for user requesting API key issuance" ), "messages": ( [ "DB session user_id, ip_address, user_agent, " "user_role does not match provided session info." ] ), } # # finally, generate the API key # random_token = secrets.token_urlsafe(32) # we'll return this API key dict to the frontend so it can JSON dump it, # encode to bytes, then encrypt, then sign it, and finally send back to the # client issued = datetime.utcnow() expires = issued + timedelta(days=payload["expires_days"]) notvalidbefore = issued + timedelta(seconds=payload["not_valid_before"]) apikey_dict = { "iss": payload["issuer"], "ver": payload["apiversion"], "uid": payload["user_id"], "rol": payload["user_role"], "usa": payload["user_agent"], "aud": payload["audience"], "sub": payload["subject"], "ipa": payload["ip_address"], "tkn": random_token, "iat": issued.isoformat(), "nbf": notvalidbefore.isoformat(), "exp": expires.isoformat(), } apikey_json = json.dumps(apikey_dict) # we'll also store this dict in the apikeys table apikeys = meta.tables["apikeys"] # NOTE: we store only the random token. this will later be checked for # equality against the value stored in the API key dict['tkn'] when we send # in this API key for verification later ins = insert(apikeys).values( { "apikey": random_token, "issued": issued, "expires": expires, "not_valid_before": notvalidbefore, "user_id": payload["user_id"], "user_role": payload["user_role"], "session_token": payload["session_token"], } ) with engine.begin() as conn: conn.execute(ins) # # return the API key to the frontend # LOGGER.info( "[%s] API key request successful. " "user_id: %s, session_token: %s, role: '%s', " "ip_address: %s, user_agent: %s requested an API key for " "audience: '%s', subject: '%s', apiversion: '%s'. " "API key not valid before: %s, expires on: %s." % ( payload["reqid"], pii_hash(payload["user_id"], payload["pii_salt"]), pii_hash(payload["session_token"], payload["pii_salt"]), payload["user_role"], pii_hash(payload["ip_address"], payload["pii_salt"]), pii_hash(payload["user_agent"], payload["pii_salt"]), payload["audience"], payload["subject"], payload["apiversion"], notvalidbefore.isoformat(), expires.isoformat(), ) ) messages = ( "API key generated successfully, expires: %s." % expires.isoformat() ) return { "success": True, "apikey": apikey_json, "expires": expires.isoformat(), "messages": ([messages]), }
[docs]def verify_apikey( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ) -> dict: """Checks if an API key is valid. Parameters ---------- payload : dict This dict contains a single key: - apikey_dict: the decrypted and verified API key info dict from the frontend. - user_id: the user ID of the person wanting to verify this key. - user_role: the user role of the person wanting to verify this key. raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually verify ("read") an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True if API key is OK and False otherwise, 'messages': list of str messages if any} """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=None, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "apikey": None, "expires": None, "failure_reason": ( "invalid request: missing '%s' from request" % key ), "messages": ["Invalid API key request."], } for key in ("apikey_dict", "user_id", "user_role"): if key not in payload: LOGGER.error( "[%s] Invalid API key request, missing %s." % (payload["reqid"], key) ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' from request" % key ), "messages": ["Some required keys are missing from payload."], } apikey_dict = payload["apikey_dict"] user_id = payload["user_id"] user_role = payload["user_role"] # check if the user is allowed to read the presented API key apikey_verify_allowed = check_user_access( { "user_id": user_id, "user_role": user_role, "action": "view", "target_name": "apikey", "target_owner": apikey_dict["uid"], "target_visibility": "private", "target_sharedwith": None, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_verify_allowed["success"]: LOGGER.error( "[%s] Invalid API key verification request. " "from user_id: %s, role: %s. The API key presented is " "not readable by this user." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "originating user is not allowed to operate on this API key" ), "messages": [ "API key verification failed. " "You are not allowed to operate on this API key." ], } apikeys = meta.tables["apikeys"] # the apikey sent to us must match the stored apikey's properties: # - token # - userid # - expired must be in the future # - issued must be in the past # - not_valid_before must be in the past dt_utcnow = datetime.utcnow() sel = ( select( apikeys.c.apikey, apikeys.c.expires, ) .select_from(apikeys) .where(apikeys.c.apikey == apikey_dict["tkn"]) .where(apikeys.c.user_id == apikey_dict["uid"]) .where(apikeys.c.user_role == apikey_dict["rol"]) .where(apikeys.c.expires > dt_utcnow) .where(apikeys.c.issued < dt_utcnow) .where(apikeys.c.not_valid_before < dt_utcnow) ) with engine.begin() as conn: result = conn.execute(sel) row = result.first() if row is not None and len(row) != 0: LOGGER.info( "[%s] API key verified successfully. " "user_id: %s, role: '%s', audience: '%s', subject: '%s', " "apiversion: %s, expires on: %s" % ( payload["reqid"], pii_hash(apikey_dict["uid"], payload["pii_salt"]), apikey_dict["rol"], apikey_dict["aud"], apikey_dict["sub"], apikey_dict["ver"], apikey_dict["exp"], ) ) return { "success": True, "messages": [ ( "API key verified successfully. Expires: %s." % row.expires.isoformat() ) ], } else: LOGGER.error( "[%s] API key verification failed. Failed key " "user_id: %s, role: '%s', audience: '%s', subject: '%s', " "apiversion: %s, expires on: %s" % ( payload["reqid"], pii_hash(apikey_dict["uid"], payload["pii_salt"]), apikey_dict["rol"], apikey_dict["aud"], apikey_dict["sub"], apikey_dict["ver"], apikey_dict["exp"], ) ) return { "success": False, "failure_reason": ( "key validation failed, " "provided key does not match stored key or has expired" ), "messages": ["API key could not be verified."], }
[docs]def revoke_apikey( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ) -> dict: """Revokes an API key. Parameters ---------- payload : dict This dict contains the following keys: - apikey_dict: the decrypted and verified API key info dict from the frontend. - user_id: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. - user_role: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually revoke ("delete") an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True if API key was revoked and False otherwise, 'messages': list of str messages if any} """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=None, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' from request" % key ), "messages": ["Invalid API key revocation request."], } for key in ("apikey_dict", "user_id", "user_role"): if "apikey_dict" not in payload: LOGGER.error( "[%s] Invalid API key revocation request, missing %s." % (payload["reqid"], key) ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' from request" % key ), "messages": ["Some required keys are missing from payload."], } apikey_dict = payload["apikey_dict"] user_id = payload["user_id"] user_role = payload["user_role"] # check if the user is allowed to revoke the presented API key apikey_revocation_allowed = check_user_access( { "user_id": user_id, "user_role": user_role, "action": "delete", "target_name": "apikey", "target_owner": apikey_dict["uid"], "target_visibility": "private", "target_sharedwith": None, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_revocation_allowed["success"]: LOGGER.error( "[%s] Invalid API key revocation request. " "from user_id: %s, role: '%s'. The API key presented is " "not revocable by this user." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "originating user is not allowed to operate on this API key" ), "messages": [ "API key revocation failed. " "You are not allowed to operate on this API key." ], } # # everything checks out so go ahead and delete the API key # apikeys = meta.tables["apikeys"] delete = ( apikeys.delete() .where(apikeys.c.apikey == apikey_dict["tkn"]) .where(apikeys.c.user_id == apikey_dict["uid"]) .where(apikeys.c.user_role == apikey_dict["rol"]) ) with engine.begin() as conn: result = conn.execute(delete) success = result.rowcount == 1 LOGGER.info( "[%s] API key revocation request processed. " "User_id: %s, role: %s, success: %s." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), success, ) ) return {"success": success, "messages": ["API key revocation processed."]}