Commit 3542c957 authored by jackie / Andrea Ida Malkah Klaura's avatar jackie / Andrea Ida Malkah Klaura
Browse files

add python version of implicit flow

parent c343bb43
......@@ -27,6 +27,20 @@ the OIDC flow you use in your client.
## Usage
### Bash
For the shell functions take a look at [bash/main.sh](bash/main.sh) as a
starting point. This shows how to use the include files and get a token
with one of the three provided flows.
### Python
## Todo
- add at least one call to the API including a bearer token in the main demos
- test for invalid credentials and provide error text
- add user-agent header and make it configurable
- make bash functions exit at the end of each stage in case of errors
- add checks for state and nonce
- check, why hybdid flow does not return access code
config.py
__pycache__
#!/usr/bin/python3
from config import config as cfg
from steering import implicit
parameters = {
"response_type": "id_token token",
}
oidc = implicit.get_token(cfg, parameters)
print(oidc)
import requests
import re
def initiate_flow (cfg, parameters):
"""
Initiates an OIDC flow and returns the URL to the login form
"""
headers = {"User-Agent": cfg["user_agent"]}
payload = {
"client_id": cfg["client_id"],
"scope": cfg["scope"],
"redirect_uri": cfg["redirect_uri"],
"response_type": parameters["response_type"],
"state": parameters["state"],
"nonce": parameters["nonce"],
}
url = cfg["base_url"] + cfg["authorize_endpoint"]
try:
response = requests.get(url, headers=headers, params=payload, allow_redirects=False)
except Error as e:
print(e)
return response.headers["Location"]
def handle_login_form (cfg, parameters):
"""
Requests the login form and submits all necessary data, including a separate
post to confirm consent if required by the auth backend. After success the
final callback redirect URL is returned
"""
headers = {"User-Agent": cfg["user_agent"]}
url = cfg["base_url"] + parameters["location"]
try:
response = requests.get(url, headers=headers, allow_redirects=False)
except Error as e:
print(e)
# save cookies (including csrf token and session id), and extract form data
jar = response.cookies
m = re.search('<form action="([^"]*)"', response.text)
submit_url = m.groups()[0]
m = re.search("<input type='hidden' name='csrfmiddlewaretoken' value='([^']*)'", response.text)
csrf_mw_token = m.groups()[0]
m = re.search('<input type="hidden" name="next" value="([^"]*)"', response.text)
next_field = m.groups()[0]
# submit login data
url = cfg["base_url"] + submit_url
payload = {
"username": cfg["username"],
"password": cfg["password"],
"csrfmiddlewaretoken": csrf_mw_token,
"next": next_field,
}
try:
response = requests.post(url, headers=headers, cookies=jar, data=payload, allow_redirects=False)
except Error as e:
print(e)
# attempt to retrieve final callback redirect
jar = response.cookies
url = cfg["base_url"] + next_field
try:
response = requests.get(url, headers=headers, cookies=jar, allow_redirects=False)
except Error as e:
print(e)
# if explicit consent is required, we will not be redirected but get a
# consent form which we have to submit, before the final redirect can happen
if response.status_code != 302:
# extract form data (cookies from last request have to be reused)
m = re.search('<form method="post" action="([^"]*)"', response.text)
submit_url = m.groups()[0]
m = re.search("<input type='hidden' name='csrfmiddlewaretoken' value='([^']*)'", response.text)
csrf_mw_token = m.groups()[0]
# submit consent form
url = cfg["base_url"] + submit_url
payload = {
"csrfmiddlewaretoken": csrf_mw_token,
"client_id": cfg["client_id"],
"redirect_uri": cfg["redirect_uri"],
"response_type": parameters["response_type"],
"scope": cfg["scope"],
"state": parameters["state"],
"nonce": parameters["nonce"],
"allow": "Authorize",
}
try:
response = requests.post(url, headers=headers, cookies=jar, data=payload, allow_redirects=False)
except Error as e:
print(e)
# return callback location
return response.headers["Location"]
def get_token_from_callback (url):
"""
Extract any relevant information from a callback redirect URL
"""
# TODO: this is tailored towards an implicit flow! make generic and only
# set those pieces that are available
oidc = {}
m = re.search('access_token=([^&]*)', url)
if m:
oidc["access_token"] = m.groups()[0]
m = re.search('id_token=([^&]*)', url)
if m:
oidc["id_token"] = m.groups()[0]
return oidc
from random import randint
from . import flow_stages
def get_token (cfg, parameters):
# generate a nonce and a random state
parameters["state"] = ''.join([str(randint(0, 9)) for i in range(16)]),
parameters["nonce"] = ''.join([str(randint(0, 9)) for i in range(16)]),
# initiate the OIDC flow and set the location for the login form
parameters["location"] = flow_stages.initiate_flow(cfg, parameters)
# submit the login form and retrieve the callback URL
parameters["callback"] = flow_stages.handle_login_form(cfg, parameters)
# return the token information extracted from the callback
return flow_stages.get_token_from_callback(parameters["callback"])
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment