Source code for authnzerver.actions.apikey_nosession

# -*- coding: utf-8 -*-
# apikey_nosession.py - Waqas Bhatti (wbhatti@astro.princeton.edu) - Aug 2018
# License: MIT - see the LICENSE file for the full text.

"""This contains functions to drive API key related auth actions.

API keys generated by this module do not require existing user sessions, so are
useful for backend API services. They are shorter-lived than API keys tied to
sessions, so come with a refresh token, which can be used to fetch a new
no-session API key.

The workflow to use here is:

1. Hit the login endpoint. Make sure the endpoint uses XSRF protection. For
   Tornado generated pages, this is fairly easy: use ``xsrf_form_html()`` in a
   template to add a form field for the token. An AJAX call can pick this up
   from the form and send it in the POST request as the ``_xsrf`` parameter. For
   an SPA, this is more difficult. For a Tornado app, the static HTML of the SPA
   endpoint can be served using a ``StaticFileHandler`` that makes sure to set
   the ``_xsrf`` cookie (session-only scoped) by calling ``self.xsrf_cookie`` in
   the ``initialize()`` or ``prepare()`` method (this sets the cookie as a
   side-effect). That way, any response back from the SPA endpoint will have the
   ``_xsrf`` cookie (this is not HttpOnly), and the POST request can then read
   the cookie and send it back as verification in the request header or POST
   request params.

2. After login is verified, run the :py:func:`.issue_apikey` function
   below. This returns an API key and a refresh token. The API key should have
   an expiry no longer than 15 minutes (or about the time needed to process a
   single API call). The refresh token has a longer expiry time (no longer than
   24 hours or maybe the actual session lifetime) to allow for the user coming
   back and having to fetch a new API key. The refresh token is effectively a
   password, and the authnzerver stores it as such, generating a random 32-byte
   token, then using Argon2-ID to hash and store it in the DB. To verify it, we
   run the Argon2-ID hash as we do for passwords.

3. For SPAs, send back the API key and its expiry date in the response body, and
   set the refresh token as an HttpOnly, Secure cookie, with the TTL set to the
   expiry date of the refresh token. For API-only calls, the refresh token will
   have to be sent in the JSON response body. HTTPS is required in all cases.

4. The client can then use the no-session API key as normal until it
   expires. The verification of the API key itself can take place statelessly if
   it is decrypted correctly, has not expired, and the the claims in the
   decrypted key dict match the API endpoint's requirements. If further
   verification is required, the frontend can call :py:func:`.verify_apikey`,
   which will check the API key against the one stored in the DB.

5. A bit before the API key expires, the client can hit an refresh-api-key
   endpoint on the frontend that is dedicated to refreshing the API key. The
   refresh token is presented in the cookie to the endpoint so the refresh
   request can be authenticated.

6. Use the :py:func:`.refresh_apikey` function to refresh the API key. The
   refresh token presented is verified, and if that passes, a new API key + its
   expiry is sent back to the client, and a NEW refresh token MUST ALSO be set
   as an HttpOnly cookie if the client is an SPA (send back the refresh token in
   the response body if not).

7. To enforce logout or account deletion or lock, the API key and the refresh
   token are deleted from the ``apikeys_nosession`` table by the
   :py:func:`.revoke_apikey` or :py:func:`.revoke_all_apikeys` function
   below. The refresh token cookie MUST also be deleted from the client if it
   hits the logout/delete endpoint successfully.

References
----------

- https://pragmaticwebsecurity.com/cheatsheets.html
- https://hasura.io/blog/best-practices-of-using-jwt-with-graphql/
- https://levelup.gitconnected.com/secure-jwts-with-backend-for-frontend-9b7611ad2afb

"""

#############
## LOGGING ##
#############

import logging
import json
from types import SimpleNamespace

# get a logger
LOGGER = logging.getLogger(__name__)


#############
## IMPORTS ##
#############

try:

    from datetime import datetime, timezone, timedelta

    utc = timezone.utc

except Exception:

    from datetime import datetime, timedelta, tzinfo

    ZERO = timedelta(0)

    class UTC(tzinfo):
        """UTC"""

        def utcoffset(self, dt):
            return ZERO

        def tzname(self, dt):
            return "UTC"

        def dst(self, dt):
            return ZERO

    utc = UTC()

import secrets

from sqlalchemy import select, insert
from argon2 import PasswordHasher

from ..permissions import pii_hash
from .access import check_user_access
from authnzerver.actions.utils import get_procdb_permjson


token_hasher = PasswordHasher()


######################
## API KEY HANDLING ##
######################


[docs]def issue_apikey( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ) -> dict: """Issues a new API key. This version does not require a session. Parameters ---------- payload : dict The payload dict must have the following keys: - issuer: str, the entity that will be designated as the API key issuer - audience: str, the service this API key is being issued for - subject: str, the specific API endpoint API key is being issued for - apiversion: int or str, the API version that the API key is valid for - expires_seconds: int, the number of seconds after which the API key expires - not_valid_before: float or int, the amount of seconds after utcnow() when the API key becomes valid - user_id: int, the user ID of the user requesting the API key - user_role: str, the user role of the user requesting the API key - ip_address: str, the IP address to tie the API key to - refresh_expires: int, the number of seconds after which the API key's refresh token expires - refresh_nbf: float or int, the amount of seconds after utcnow() after which the refresh token becomes valid raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually request an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True or False, 'apikey': apikey dict, 'expires': expiry datetime in ISO format, 'refresh_token': refresh token str, 'refresh_token_expires': expiry of refresh token in ISO format, 'messages': list of str messages if any} Notes ----- API keys are tied to an IP address, user ID, and role. This function will return a dict with all the API key information. This entire dict should be serialized to JSON, encrypted and time-stamp signed by the frontend as the final "API key", and finally sent back to the client. """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=None, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "apikey": None, "expires": None, "messages": ["Invalid API key request."], } for key in { "user_id", "user_role", "expires_seconds", "not_valid_before", "issuer", "audience", "subject", "ip_address", "apiversion", "refresh_expires", "refresh_nbf", }: if key not in payload: LOGGER.error( "[%s] Invalid API key request, missing %s." % (payload["reqid"], key) ) if key not in payload: return { "success": False, "failure_reason": ( "invalid request: missing '%s' from request" % key ), "apikey": None, "expires": None, "messages": ["Some required keys are missing from payload."], } # check if the provided user_id and role can actually create an API key user_id = payload["user_id"] user_role = payload["user_role"] apikey_creation_allowed = check_user_access( { "user_id": user_id, "user_role": user_role, "action": "create", "target_name": "apikey", "target_owner": user_id, "target_visibility": "private", "target_sharedwith": None, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_creation_allowed["success"]: LOGGER.error( "[%s] Invalid no-session API key issuance request. " "from user_id: %s, role: '%s'. " "The user is not allowed to create an API key." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "originating user is not allowed to issue an API key" ), "messages": [ "API key issuance failed. " "You are not allowed to issue an API key." ], } # # finally, generate the API key # random_token = secrets.token_urlsafe(32) # we'll return this API key dict to the frontend so it can JSON dump it, # encode to bytes, then encrypt, then sign it, and finally send back to the # client issued = datetime.utcnow() expires = issued + timedelta(seconds=payload["expires_seconds"]) notvalidbefore = issued + timedelta(seconds=payload["not_valid_before"]) apikey_dict = { "iss": payload["issuer"], "ver": payload["apiversion"], "uid": payload["user_id"], "rol": payload["user_role"], "aud": payload["audience"], "sub": payload["subject"], "ipa": payload["ip_address"], "tkn": random_token, "iat": issued.isoformat(), "nbf": notvalidbefore.isoformat(), "exp": expires.isoformat(), } apikey_json = json.dumps(apikey_dict) # generate the refresh token now refresh_token = secrets.token_urlsafe(32) # the refresh token is effectively a password, so we'll treat it as such hashed_refresh_token = token_hasher.hash(refresh_token) # the refresh token expiry and nbf refresh_token_expiry = issued + timedelta( seconds=payload["refresh_expires"] ) refresh_token_nbf = issued + timedelta(seconds=payload["refresh_nbf"]) # we'll also store this dict in the apikeys table apikeys = meta.tables["apikeys_nosession"] # NOTE: we store only the random token. this will later be checked for # equality against the value stored in the API key dict['tkn'] when we send # in this API key for verification later ins = insert(apikeys).values( { "apikey": random_token, "issued": issued, "expires": expires, "not_valid_before": notvalidbefore, "refresh_token": hashed_refresh_token, "refresh_issued": issued, "refresh_expires": refresh_token_expiry, "refresh_nbf": refresh_token_nbf, "user_id": payload["user_id"], "user_role": payload["user_role"], } ) with engine.begin() as conn: conn.execute(ins) # # return the API key to the frontend # LOGGER.info( "[%s] No-session API key request successful. " "user_id: %s, role: '%s', " "ip_address: %s requested a no-session API key for " "audience: '%s', subject: '%s', apiversion: %s. " "No-session API key not valid before: %s, expires on: %s. " "Refresh token for key expires on: %s." % ( payload["reqid"], pii_hash(payload["user_id"], payload["pii_salt"]), payload["user_role"], pii_hash(payload["ip_address"], payload["pii_salt"]), payload["audience"], payload["subject"], payload["apiversion"], notvalidbefore.isoformat(), expires.isoformat(), refresh_token_expiry.isoformat(), ) ) messages = ( "API key generated successfully, expires: %s." % expires.isoformat() ) return { "success": True, "apikey": apikey_json, "expires": expires.isoformat(), "refresh_token": refresh_token, "refresh_token_expires": refresh_token_expiry.isoformat(), "messages": ([messages]), }
[docs]def verify_apikey( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ) -> dict: """Checks if an API key is valid. This version does not require a session. Parameters ---------- payload : dict This dict contains a single key: - apikey_dict: the decrypted and verified API key info dict from the frontend. - user_id: the user ID of the person wanting to verify this key. - user_role: the user role of the person wanting to verify this key. raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually verify ("read") an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True if API key is OK and False otherwise, 'messages': list of str messages if any} """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=None, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "apikey": None, "expires": None, "messages": ["Invalid API key request."], } for key in ("apikey_dict", "user_id", "user_role"): if key not in payload: LOGGER.error( "[%s] Invalid API key request, missing %s." % (payload["reqid"], key) ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Some required keys are missing from payload."], } apikey_dict = payload["apikey_dict"] user_id = payload["user_id"] user_role = payload["user_role"] # check if the user is allowed to read the presented API key apikey_verify_allowed = check_user_access( { "user_id": user_id, "user_role": user_role, "action": "view", "target_name": "apikey", "target_owner": apikey_dict["uid"], "target_visibility": "private", "target_sharedwith": None, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_verify_allowed["success"]: LOGGER.error( "[%s] Invalid API key verification request. " "from user_id: %s, role: %s. The API key presented is " "not readable by this user." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "originating user is not allowed to operate on this API key" ), "messages": [ "API key verification failed. " "You are not allowed to operate on this API key." ], } apikeys = meta.tables["apikeys_nosession"] # the apikey sent to us must match the stored apikey's properties: # - token # - userid # - expired must be in the future # - issued must be in the past # - not_valid_before must be in the past dt_utcnow = datetime.utcnow() sel = ( select( apikeys.c.apikey, apikeys.c.expires, ) .select_from(apikeys) .where(apikeys.c.apikey == apikey_dict["tkn"]) .where(apikeys.c.user_id == apikey_dict["uid"]) .where(apikeys.c.user_role == apikey_dict["rol"]) .where(apikeys.c.expires > dt_utcnow) .where(apikeys.c.issued < dt_utcnow) .where(apikeys.c.not_valid_before < dt_utcnow) ) with engine.begin() as conn: result = conn.execute(sel) row = result.first() if row is not None and len(row) != 0: LOGGER.info( "[%s] No-session API key verified successfully. " "user_id: %s, role: '%s', audience: '%s', subject: '%s', " "apiversion: %s, expires on: %s" % ( payload["reqid"], pii_hash(apikey_dict["uid"], payload["pii_salt"]), apikey_dict["rol"], apikey_dict["aud"], apikey_dict["sub"], apikey_dict["ver"], apikey_dict["exp"], ) ) return { "success": True, "messages": [ ( "No-session API key verified successfully. Expires: %s." % row.expires.isoformat() ) ], } else: LOGGER.error( "[%s] No-session API key verification failed. Failed key " "user_id: %s, role: '%s', audience: '%s', subject: '%s', " "apiversion: %s, expires on: %s" % ( payload["reqid"], pii_hash(apikey_dict["uid"], payload["pii_salt"]), apikey_dict["rol"], apikey_dict["aud"], apikey_dict["sub"], apikey_dict["ver"], apikey_dict["exp"], ) ) return { "success": False, "failure_reason": ( "key validation failed, " "provided key does not match stored key or has expired" ), "messages": ["API key could not be verified."], }
[docs]def revoke_apikey( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ): """Revokes an API key. This does not require a session. Parameters ---------- payload : dict This dict contains the following keys: - apikey_dict: the decrypted and verified API key info dict from the frontend. - user_id: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. - user_role: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually revoke ("delete") an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True if API key was revoked and False otherwise, 'messages': list of str messages if any} """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=None, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Invalid API key revocation request."], } for key in ("apikey_dict", "user_id", "user_role"): if "apikey_dict" not in payload: LOGGER.error( "[%s] Invalid API key revocation request, missing %s." % (payload["reqid"], key) ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Some required keys are missing from payload."], } apikey_dict = payload["apikey_dict"] user_id = payload["user_id"] user_role = payload["user_role"] # check if the user is allowed to revoke the presented API key apikey_revocation_allowed = check_user_access( { "user_id": user_id, "user_role": user_role, "action": "delete", "target_name": "apikey", "target_owner": apikey_dict["uid"], "target_visibility": "private", "target_sharedwith": None, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_revocation_allowed["success"]: LOGGER.error( "[%s] Invalid API key revocation request. " "from user_id: %s, role: %s. The API key presented is " "not revocable by this user." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "originating user is not allowed to operate on this API key" ), "messages": [ "API key revocation failed. " "You are not allowed to operate on this API key." ], } # # everything checks out so go ahead and delete the API key # apikeys = meta.tables["apikeys_nosession"] delete = ( apikeys.delete() .where(apikeys.c.apikey == apikey_dict["tkn"]) .where(apikeys.c.user_id == apikey_dict["uid"]) .where(apikeys.c.user_role == apikey_dict["rol"]) ) with engine.begin() as conn: result = conn.execute(delete) success = result.rowcount == 1 LOGGER.info( "[%s] API key revocation request processed. " "User_id: %s, role: '%s', success: %s." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), success, ) ) return {"success": success, "messages": ["API key revocation processed."]}
[docs]def revoke_all_apikeys( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ) -> dict: """Revokes an API key. This does not require a session, but does require a current valid and unexpired API key to revoke all API keys belonging to the specified user. Parameters ---------- payload : dict This dict contains the following keys: - apikey_dict: the decrypted and verified API key info dict from the frontend. - user_id: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. - user_role: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually revoke ("delete") an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True if API key was revoked and False otherwise, 'messages': list of str messages if any} """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=None, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Invalid API key revocation request."], } for key in ("apikey_dict", "user_id", "user_role"): if "apikey_dict" not in payload: LOGGER.error( "[%s] Invalid API key revocation request, missing %s." % (payload["reqid"], key) ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Some required keys are missing from payload."], } apikey_dict = payload["apikey_dict"] user_id = payload["user_id"] user_role = payload["user_role"] # check if the user is allowed to revoke API keys apikey_revocation_allowed = check_user_access( { "user_id": user_id, "user_role": user_role, "action": "delete", "target_name": "apikey", "target_owner": apikey_dict["uid"], "target_visibility": "private", "target_sharedwith": None, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_revocation_allowed["success"]: LOGGER.error( "[%s] Invalid API key revocation request. " "from user_id: %s, role: %s. API keys are " "not revocable by this user." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "originating user is not allowed to delete API keys" ), "messages": [ "All API keys revocation failed. " "You are not allowed to delete API keys." ], } # # verify the presented API key # apikey_verification = verify_apikey( { "apikey_dict": apikey_dict, "user_id": user_id, "user_role": user_role, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_permissions_json=override_permissions_json, override_authdb_path=override_authdb_path, ) if not apikey_verification["success"]: LOGGER.error( "[%s] Invalid API key revocation request. " "from user_id: %s, role: %s. The presented API key is " "invalid and can't be used for revoking all API keys." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "provided API key is invalid, " "revoke-all operation requires a valid key to start with" ), "messages": [ "All API keys revocation failed. " "The API key presented is invalid." ], } # # everything checks out so go ahead and delete the API key # apikeys = meta.tables["apikeys_nosession"] # delete all the API keys belonging to this user ID delete = ( apikeys.delete() .where(apikeys.c.user_id == apikey_dict["uid"]) .where(apikeys.c.user_role == apikey_dict["rol"]) ) with engine.begin() as conn: result = conn.execute(delete) success = result.rowcount > 0 LOGGER.info( "[%s] All API keys revocation request processed. " "User_id: %s, role: %s, success: %s." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), success, ) ) return { "success": True, "revoked_keys": result.rowcount, "messages": ["All API keys revocation processed."], }
[docs]def refresh_apikey( payload: dict, raiseonfail: bool = False, override_authdb_path: str = None, override_permissions_json: str = None, config: SimpleNamespace = None, ): """Refreshes a no-session API key. Requires a refresh token. Parameters ---------- payload : dict This dict contains the following keys: - apikey_dict: the decrypted and verified API key info dict from the frontend. - user_id: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. - user_role: the user ID of the person revoking this key. Only superusers or staff can revoke an API key that doesn't belong to them. - refresh_token: the refresh token needed to refresh the API key - ip_address: the current IP address of the user - expires_seconds: int, the number of seconds after which the API key expires - not_valid_before: float or int, the amount of seconds after utcnow() when the API key becomes valid - refresh_expires: int, the number of seconds after which the API key's refresh token expires - refresh_nbf: float or int, the amount of seconds after utcnow() after which the refresh token becomes valid raiseonfail : bool If True, will raise an Exception if something goes wrong. override_authdb_path : str or None If given as a str, is the alternative path to the auth DB. override_permissions_json : str or None If given as a str, is the alternative path to the permissions JSON to use. This is used to check if the user_id is allowed to actually refresh ("delete" then "create") an API key. config : SimpleNamespace object or None An object containing systemwide config variables as attributes. This is useful when the wrapping function needs to pass in some settings directly from environment variables. Returns ------- dict The dict returned is of the form:: {'success': True if API key was revoked and False otherwise, 'messages': list of str messages if any} """ engine, meta, permjson, dbpath = get_procdb_permjson( override_authdb_path=override_authdb_path, override_permissions_json=None, raiseonfail=raiseonfail, ) for key in ("reqid", "pii_salt"): if key not in payload: LOGGER.error( "Missing %s in payload dict. Can't process this request." % key ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Invalid API key revocation request."], } for key in { "apikey_dict", "user_id", "user_role", "refresh_token", "ip_address", "expires_seconds", "not_valid_before", "refresh_expires", "refresh_nbf", }: if key not in payload: LOGGER.error( "[%s] Invalid no-session API key refresh request, missing %s." % (payload["reqid"], key) ) return { "success": False, "failure_reason": ( "invalid request: missing '%s' in request" % key ), "messages": ["Some required keys are missing from payload."], } apikey_dict = payload["apikey_dict"] user_id = payload["user_id"] user_role = payload["user_role"] refresh_token = payload["refresh_token"] # # go ahead and try to refresh the API key # apikeys = meta.tables["apikeys_nosession"] dt_utcnow = datetime.utcnow() # check the hashed refresh token against the stored refresh token refresh_token_sel = ( select(apikeys.c.refresh_token) .select_from(apikeys) .where(apikeys.c.apikey == apikey_dict["tkn"]) .where(apikeys.c.user_id == apikey_dict["uid"]) .where(apikeys.c.user_role == apikey_dict["rol"]) .where(apikeys.c.refresh_expires > dt_utcnow) .where(apikeys.c.refresh_nbf < dt_utcnow) .where(apikeys.c.refresh_issued < dt_utcnow) ) with engine.begin() as conn: result = conn.execute(refresh_token_sel) stored_refresh_token_hash = result.scalar() if stored_refresh_token_hash is None: LOGGER.error( "[%s] Invalid no-session API key refresh request. " "from user_id: %s, role: '%s'. " "The API key presented does not have a valid refresh token." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "provided API key has no stored refresh token, probably invalid" ), "messages": [ "API key refresh failed. " "The API key presented does not have a valid refresh token. " "You may need to login again to generate a new API key." ], } try: token_hasher.verify(stored_refresh_token_hash, refresh_token) except Exception: LOGGER.error( "[%s] Invalid no-session API key refresh request. " "from user_id: %s, role: '%s'. " "The API key presented did not pass " "refresh-token hash verification." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "provided refresh token does not match stored refresh token" ), "messages": [ "API key refresh failed. " "The API key presented does not have a valid refresh token. " "You may need to login again to generate a new API key." ], } # # now that the refresh token has been verified, delete the old API key and # then generate a new API key # revoke_try = revoke_apikey( { "apikey_dict": apikey_dict, "user_id": user_id, "user_role": user_role, "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_authdb_path=override_authdb_path, override_permissions_json=override_permissions_json, ) if not revoke_try["success"]: LOGGER.error( "[%s] Invalid no-session API key refresh request. " "from user_id: %s, role: '%s'. " "Could not delete the old API key." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "user from provided API key not allowed to revoke old key" ), "messages": [ "API key refresh failed. " "The API key presented does not have a valid refresh token." "You may need to login again to generate a new API key." ], } new_apikey_try = issue_apikey( { "issuer": apikey_dict["iss"], "apiversion": apikey_dict["ver"], "user_id": user_id, "user_role": user_role, "audience": apikey_dict["aud"], "subject": apikey_dict["sub"], "ip_address": payload["ip_address"], "expires_seconds": payload["expires_seconds"], "not_valid_before": payload["not_valid_before"], "refresh_expires": payload["refresh_expires"], "refresh_nbf": payload["refresh_nbf"], "reqid": payload["reqid"], "pii_salt": payload["pii_salt"], }, raiseonfail=raiseonfail, override_authdb_path=override_authdb_path, override_permissions_json=override_permissions_json, ) if not new_apikey_try["success"]: LOGGER.error( "[%s] Invalid no-session API key refresh request. " "from user_id: %s, role: '%s'. " "Could not generate a new API key." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return { "success": False, "failure_reason": ( "user from provided API key not allowed to issue new key" ), "messages": [ "API key refresh failed. " "The API key presented does not have a valid refresh token." "You may need to login again to generate a new API key." ], } # # otherwise, everything is ok, return the new API key # LOGGER.info( "[%s] API key refresh request succeeded. " "User_id: %s, role: '%s'." % ( payload["reqid"], pii_hash(user_id, payload["pii_salt"]), pii_hash(user_role, payload["pii_salt"]), ) ) return new_apikey_try