Commit 1b52f79b authored by jackie / Andrea Ida Malkah Klaura's avatar jackie / Andrea Ida Malkah Klaura
Browse files

add authorization code flow in python stub

parent 05599e4a
......@@ -3,6 +3,7 @@
import argparse
from config import config as cfg
from steering import implicit
from steering import authorization
import tank.session
import requests
......@@ -14,6 +15,8 @@ parser.add_argument("-s", "--steering", action="store_true",
help="Make an authorised test call against the steering API")
parser.add_argument("-t", "--tank", action="store_true",
help="Make an authorised test call against the tank API")
parser.add_argument("-m", "--mode", choices=["implicit", "authorization"],
default="implicit", help="The OIDC flow mode (default: %(default)s)")
args = parser.parse_args()
if args.verbosity:
......@@ -22,12 +25,23 @@ else:
cfg["verbosity"] = 0
# we have to set the response_type according to our flow type
parameters = {
"response_type": "id_token token",
}
parameters = { "response_type": "id_token token" }
if args.mode == "authorization":
parameters["response_type"] = "code"
# now we can retrieve an access token from steering
oidc = implicit.get_token(cfg, parameters)
if args.mode == "implicit":
# demo of the implicit flow
if cfg["verbosity"] >= 1:
print("Using implicit flow")
oidc = implicit.get_token(cfg, parameters)
elif args.mode == "authorization":
# demo of the authorization code flow
if cfg["verbosity"] >= 1:
print("Using authorization code flow")
oidc = authorization.get_auth_code(cfg, parameters)
oidc = authorization.get_token_from_code(cfg, oidc["code"])
if cfg["verbosity"] == 0:
print("access token:", oidc["access_token"])
......
from random import randint
from . import flow_stages
import requests
def get_auth_code (cfg, parameters):
# generate a nonce and a random state
parameters["state"] = ''.join([str(randint(0, 9)) for i in range(16)]),
parameters["nonce"] = ''.join([str(randint(0, 9)) for i in range(16)]),
# initiate the OIDC flow and set the location for the login form
parameters["location"] = flow_stages.initiate_flow(cfg, parameters)
# submit the login form and retrieve the callback URL
parameters["callback"] = flow_stages.handle_login_form(cfg, parameters)
# return the token information extracted from the callback
return flow_stages.get_token_from_callback(cfg, parameters["callback"])
def get_token_from_code (cfg, code):
"""
Retrieves bearer and ID tokens from AURA steering, given a correct
authorization code and client secret are provided
"""
url = cfg["base_url"] + cfg["token_endpoint"]
headers = {"User-Agent": cfg["user_agent"]}
payload = {
"client_id": cfg["client_id"],
"client_secret": cfg["client_secret"],
"code": code,
"redirect_uri": cfg["redirect_uri"],
"grant_type": "authorization_code",
}
try:
if cfg["verbosity"] >= 1:
print("Calling OIDC token endpoint to exchange access code for token")
response = requests.post(url, headers=headers, data=payload, allow_redirects=False)
except Error as e:
e = sys.exc_info()
print(e[0].__name__, ':', e[1])
sys.exit(1)
if cfg["verbosity"] >= 2:
print(response.json())
return response.json()
......@@ -144,8 +144,6 @@ def get_token_from_callback (cfg, url):
print("Stage 5: processing the callback redirect URL")
if cfg["verbosity"] >= 2:
print("callback URL:", url)
# TODO: this is tailored towards an implicit flow! make generic and only
# set those pieces that are available
oidc = {}
m = re.search('access_token=([^&]*)', url)
if m:
......@@ -153,6 +151,9 @@ def get_token_from_callback (cfg, url):
m = re.search('id_token=([^&]*)', url)
if m:
oidc["id_token"] = m.groups()[0]
m = re.search('code=([^&]*)', url)
if m:
oidc["code"] = m.groups()[0]
if cfg["verbosity"] >= 1:
print("Successfully retrieved access codes/tokens:", oidc)
return oidc
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment