Source code for flask_cognitologin.cognitologin
"""Main module."""
from flask import session, request, current_app, _app_ctx_stack
from requests.auth import HTTPBasicAuth
from jose import jwt
from datetime import datetime
import requests
import os
[docs]class CognitoLogin(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
[docs] def init_app(self, app):
"""Initialize the extension
This extension need session so the ``SECRET_KEY`` should be
configured::
app.config['SECRET_KEY'] = 'some-secret-of-my-own'
You also need:
* ``AWS_REGION``: aws region of your cognito user pool
* ``COGNITO_POOL_ID``: Cognito user pool ID
* ``COGNITO_DOMAIN``: this is the full hostname of the cognito domain
for example, ``mycogdomain.auth.eu-west-1.amazoncognito.com``,
refer to `aws documentation`_
* ``COGNITO_CLIENT_ID``: Your cognito client ID
* ``COGNITO_CALLBACK_URL``: URL for the `autorization code grant`_,
cognito will call to this URL with the user tokens
* ``COGNITO_CLIENT_SECRET``: Your cognito client secret
:raises ValueError: if the config keys are missing
.. _aws documentation: https://shorturl.at/tuwBF
.. _autorization code grant: https://shorturl.at/pFIKR
"""
config = app.config
mykeys = [
'AWS_REGION', 'COGNITO_POOL_ID', 'COGNITO_DOMAIN',
'COGNITO_CLIENT_ID', 'COGNITO_CALLBACK_URL',
'COGNITO_CLIENT_SECRET']
tests = any([config.get(k) is None for k in mykeys])
if tests:
raise ValueError("Missing config keys for flask_cognito")
app.teardown_appcontext(self.teardown)
def _getCsrfState(self):
session['mycogext_csrf_state'] = os.urandom(16).hex()
return session['mycogext_csrf_state']
[docs] def getSignInUrl(self):
"""Return the cognito url for login"""
csrf_state = self._getCsrfState()
config = current_app.config
return (
"https://{domain}/login?response_type=code&"
"client_id={clientid}&state={csrf_state}&"
"redirect_uri={callbackurl}".format(
domain=config.get('COGNITO_DOMAIN'),
clientid=config.get('COGNITO_CLIENT_ID'),
csrf_state=csrf_state,
callbackurl=config.get('COGNITO_CALLBACK_URL')
)
)
[docs] def getLogOutUrl(self):
"""Return the cognito logout url"""
config = current_app.config
return (
"https://{domain}/logout?response_type=code&client_id="
"{clientid}&redirect_uri={callbackurl}".format(
domain=config.get('COGNITO_DOMAIN'),
clientid=config.get('COGNITO_CLIENT_ID'),
callbackurl=config.get('COGNITO_CALLBACK_URL')
)
)
[docs] def getIdentity(self):
"""Process cognito autorization code grant
:return: the user identity or None
:rtype: dict
"""
csrf_state = request.args.get('state')
code = request.args.get('code')
config = current_app.config
payload = {
'grant_type': 'authorization_code',
'client_id': config.get('COGNITO_CLIENT_ID'),
'code': code,
"redirect_uri": config.get('COGNITO_CALLBACK_URL')
}
r = requests.post(
"https://%s/oauth2/token" % config.get('COGNITO_DOMAIN'),
data=payload,
auth=HTTPBasicAuth(
config.get('COGNITO_CLIENT_ID'),
config.get('COGNITO_CLIENT_SECRET')
)
)
if r.ok and (csrf_state == session['mycogext_csrf_state']):
self._verify(r.json()['access_token'])
id_token = self._verify(
r.json()['id_token'],
access_token=r.json()['access_token'])
ret = dict()
ret.update(id_token)
ret['refresh_token'] = r.json()['refresh_token']
return ret
return None
[docs] def getTokens(self, refresh_token):
"""Returns the ``id_token`` and ``access_token``
:param str refresh_token: refresh token for the user
:returns: a ``dict`` with the keys ``id_token`` and ``access_token``
:rtype: dict
"""
config = current_app.config
payload = {
'grant_type': 'refresh_token',
'client_id': config.get('COGNITO_CLIENT_ID'),
'refresh_token': refresh_token
}
r = requests.post(
"https://%s/oauth2/token" % config.get('COGNITO_DOMAIN'),
data=payload,
auth=HTTPBasicAuth(
config.get('COGNITO_CLIENT_ID'),
config.get('COGNITO_CLIENT_SECRET'))
)
if r.ok:
return {
'access_token': r.json()['access_token'],
'id_token': r.json()['id_token']
}
else:
return None
[docs] def checkIdentity(self, identity):
"""Check identity claims
If the current identity is about to expire a new one will be emitted.
If ``identity`` does not has ``exp`` and ``refresh_token`` keys this
returns ``None``
:param dict identity: current user identity claims
:returns: identity
:rtype: dict
"""
if 'exp' not in identity:
return None
if 'refresh_token' not in identity:
return None
expires = datetime.utcfromtimestamp(identity['exp'])
expires_seconds = (expires - datetime.utcnow()).total_seconds()
if expires_seconds < 0:
refresh_token = identity['refresh_token']
r = self.getTokens(refresh_token)
if r:
self._verify(r['access_token'])
id_token = self._verify(
r['id_token'],
access_token=r['access_token'])
ret = dict()
ret.update(id_token)
ret['refresh_token'] = refresh_token
return ret
else:
return None
return identity
def _verify(self, token, access_token=None):
"""Verify a cognito JWT
Get the key id from the header, locate it in the cognito keys
and verify the key
"""
header = jwt.get_unverified_header(token)
config = current_app.config
key = [k for k in self.JWKS if k["kid"] == header['kid']][0]
id_token = jwt.decode(
token, key, audience=config.get('COGNITO_CLIENT_ID'),
access_token=access_token)
return id_token
[docs] def teardown(self, exception):
pass
# nothing todo here right now
@property
def JWKS(self):
config = current_app.config
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, 'aws_jwkeys'):
url = (
"https://cognito-idp.{}.amazonaws.com/{}/"
".well-known/jwks.json").format(
config.get('AWS_REGION'),
config.get('COGNITO_POOL_ID')
)
ctx.aws_jwkeys = requests.get(url).json()["keys"]
return ctx.aws_jwkeys