先决条件:
需要在lzc-manifest.yml定义 oidc_redirect_path 和 environment。
配置lzc-manifest.yml oidc_redirect_path 就是你的应用的回调地址,只有写了这个之后才能正确使用 OpenID Connect 的环境变量。
回调地址是按照应用而定的,有的是/callback,/oidc/callback 或者/oauth/callback。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 lzc-sdk-version: 0.1 name: 懒猫ENV查看器 package: xu.deploy.env version: 0.0 .2 description: license: https://choosealicense.com/licenses/mit/ homepage: author: xu application: subdomain: env oidc_redirect_path: /callback routes: - /=exec://5005,./lzcapp/pkg/content/run.sh environment: - LAZYCAT_AUTH_OIDC_CLIENT_ID=${LAZYCAT_AUTH_OIDC_CLIENT_ID} - LAZYCAT_AUTH_OIDC_CLIENT_SECRET=${LAZYCAT_AUTH_OIDC_CLIENT_SECRET} - LAZYCAT_AUTH_OIDC_AUTH_URI=${LAZYCAT_AUTH_OIDC_AUTH_URI} - LAZYCAT_AUTH_OIDC_TOKEN_URI=${LAZYCAT_AUTH_OIDC_TOKEN_URI} - LAZYCAT_AUTH_OIDC_USERINFO_URI=${LAZYCAT_AUTH_OIDC_USERINFO_URI} - LAZYCAT_AUTH_OIDC_ISSUER_URI=${LAZYCAT_AUTH_OIDC_ISSUER_URI}
定义了环境变量之后,我们就可以在代码中使用环境变量:
开机时一次性从环境变量读取懒猫微服的应用域名、OIDC 客户端 ID/密钥,以及授权、令牌、用户信息三个核心端点,并根据应用域名拼出默认 Redirect URI,从而把所有与 OpenID Connect 登录相关的敏感信息解耦。
这里的 callback 是应用的回调 URL,需要根据应用调整。
1 2 3 4 5 6 7 8 9 LAZYCAT_BOX_DOMAIN = os.environ.get('LAZYCAT_BOX_DOMAIN' ) LAZYCAT_APP_DOMAIN = os.environ.get('LAZYCAT_APP_DOMAIN' ) CLIENT_ID = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_ID" ) CLIENT_SECRET = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_SECRET" ) AUTH_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_AUTH_URI" ) TOKEN_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_TOKEN_URI" ) USERINFO_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_USERINFO_URI" ) REDIRECT_URI = os.getenv("OIDC_REDIRECT_URI" , f"https://{LAZYCAT_APP_DOMAIN} /callback" )
登录功能 在用户访问 /login 时动态生成一对 PKCE 凭据(随机 code verifier 和其 SHA-256 派生的 code challenge),把 verifier 暂存进会话,再携带 challenge 等参数构造 OIDC 授权码请求,并将用户浏览器重定向到身份提供方完成安全登录;回调阶段可用 session 中的 code verifier 与返回的 code exchange 配合,防止授权码被劫持或重放,从而提升 OAuth 2.0/OIDC 的安全性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def generate_pkce_pair (): code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32 )).rstrip(b'=' ).decode() code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode()).digest() ).rstrip(b'=' ).decode() return code_verifier, code_challenge @app.route('/login' ) def login (): code_verifier, code_challenge = generate_pkce_pair() session['code_verifier' ] = code_verifier auth_url = ( f"{AUTH_ENDPOINT} " f"?response_type=code" f"&client_id={CLIENT_ID} " f"&redirect_uri={REDIRECT_URI} " f"&scope=openid profile email" f"&code_challenge={code_challenge} " f"&code_challenge_method=S256" ) print (auth_url) return redirect(auth_url)
generate_pkce_pair()
使用 secrets.token_bytes(32) 随机生成 32 字节高强度随机数。
先经 base64.urlsafe_b64encode 再去掉尾部 = 得到 code verifier 。
对 code verifier 做 SHA-256 散列后再次 base64 URL 安全编码并去掉 =,得到 code challenge 。
返回二元组 (code_verifier, code_challenge)。
/login 路由
调用 generate_pkce_pair() 生成并拿到 code_verifier 和 code_challenge。
将 code_verifier 写入 Flask session,以便稍后在回调时校验。
拼接授权端点 AUTH_ENDPOINT 形成认证 URL:
response_type=code 采用授权码模式
client_id、redirect_uri、scope 等常规 OIDC 参数
code_challenge 与 code_challenge_method=S256 声明使用 PKCE(S256)
redirect(auth_url) 将浏览器跳转到身份提供方进行登录 + 授权
回调地址 /callback 处理函数先从回调参数取出授权码,再用会话里的 code verifier 按 PKCE + 授权码模式向令牌端点换取 access token 和 ID token;成功后用 access token 调 /userinfo 获取用户资料,并把三者一起返回。如此既完成了 OAuth 2.0 的安全换码,又拿到了 OIDC 提供的登录身份信息,实现前后端分离的单点登录闭环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @app.route('/callback' ) def callback (): code = request.args.get('code' ) code_verifier = session.get('code_verifier' ) headers = {'Content-Type' : 'application/x-www-form-urlencoded' } data = { 'grant_type' : 'authorization_code' , 'client_id' : CLIENT_ID, 'client_secret' : CLIENT_SECRET, 'code' : code, 'redirect_uri' : REDIRECT_URI, 'code_verifier' : code_verifier, } token_resp = requests.post(TOKEN_ENDPOINT, data=data, headers=headers) print (token_resp) token_data = token_resp.json() print (token_data) access_token = token_data.get('access_token' ) id_token = token_data.get('id_token' ) userinfo_resp = requests.get(USERINFO_ENDPOINT, headers={ 'Authorization' : f'Bearer {access_token} ' }) return { 'Access Token' : access_token, 'ID Token' : id_token, 'UserInfo' : userinfo_resp.json() }
code = request.args.get('code')
从回调 URL 查询参数中取出授权服务器返回的 code(授权码)。
code_verifier = session.get('code_verifier')
读取先前 /login 时存进会话的 code verifier ,准备用于 PKCE 校验。
准备换取令牌的 HTTP POST 请求
1 2 3 4 5 6 7 8 headers = {'Content-Type' : 'application/x-www-form-urlencoded' } data = { 'grant_type' : 'authorization_code' , 'client_id' : CLIENT_ID, 'code' : code, 'redirect_uri' : REDIRECT_URI, 'code_verifier' : code_verifier, }
code_verifier 会被身份提供方与首跳收到的 code_challenge 做 SHA-256 对比,从而证明客户端的“持有者”身份,防止授权码被截获后被第三方滥用。
token_resp = requests.post(TOKEN_ENDPOINT, data=data, headers=headers)
向令牌端点发送表单数据以换取 Access Token / ID Token 。
token_data = token_resp.json()
解析 JSON 响应。例如:
1 2 3 4 5 6 7 { "access_token" : "..." , "id_token" : "..." , "expires_in" : 3600 , "token_type" : "Bearer" , ... }
access_token = token_data.get('access_token')
读取访问令牌,用于调用受保护 API。id_token = token_data.get('id_token')
读取 OIDC ID Token ,携带用户身份声明,可本地解码验证。
获取用户信息
1 2 3 4 userinfo_resp = requests.get( USERINFO_ENDPOINT, headers={'Authorization' : f'Bearer {access_token} ' } )
按 OIDC 规范,用 Bearer Token 调 /userinfo 端点,拿到 JSON 形式的用户信息。
返回聚合结果(此处直接返回给浏览器以便演示)
1 2 3 4 5 return { 'Access Token' : access_token, 'ID Token' : id_token, 'UserInfo' : userinfo_resp.json() }
在浏览器中可以看到这个登录跳转:
还是这个图,我们继续看这个流程:
拿到 code 之后可以就可以换到 Access token 和 ID Token 了,这个 code 只有一次有效,可以达到防重放的效果。当然这个只是 OIDC 的一个例子,在生产环境的 APP 中还需要做路由守卫以及 access 续签的操作。
完整代码如下:
读取配置
从环境变量获取 Lazycat 平台的域名、OIDC 客户端 ID/密钥,以及授权端点、令牌端点、用户信息端点和回调地址。
用随机 app.secret_key 支持 Flask Session。
PKCE 安全增强
generate_pkce_pair() 动态生成 code_verifier / code_challenge ;后者随登录请求携带,前者保存在 Session,回调时再带给 Token 端点,防止授权码被劫持。
核心路由
/:渲染首页(需自备 index.html)。
/login:
生成 PKCE 对;
拼接授权 URL(response_type=code,scope 含 openid profile email);
浏览器重定向到 IdP 登录/授权页面。
/callback:
取回 code 与 code_verifier;
POST 到 TOKEN_ENDPOINT 换取 access_token 和 id_token;
用 access_token 调用 USERINFO_ENDPOINT 拿到用户信息;
以 JSON 形式返回令牌与用户资料。
注意 :生产环境应关闭 debug=True、使用 HTTPS、校验 state 参数防 CSRF,并妥善处理 Token 异常和错误分支。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import osimport base64import hashlibimport secretsimport requestsfrom flask import Flask, redirect, request, session, url_for, render_templateapp = Flask(__name__) app.secret_key = os.urandom(24 ) LAZYCAT_BOX_DOMAIN = os.environ.get('LAZYCAT_BOX_DOMAIN' ) LAZYCAT_APP_DOMAIN = os.environ.get('LAZYCAT_APP_DOMAIN' ) CLIENT_ID = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_ID" ) CLIENT_SECRET = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_SECRET" ) AUTH_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_AUTH_URI" ) TOKEN_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_TOKEN_URI" ) USERINFO_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_USERINFO_URI" ) REDIRECT_URI = os.getenv("OIDC_REDIRECT_URI" , f"https://{LAZYCAT_APP_DOMAIN} /callback" ) print (CLIENT_ID)print (CLIENT_SECRET)print (AUTH_ENDPOINT)print (TOKEN_ENDPOINT)print (USERINFO_ENDPOINT)print (REDIRECT_URI)def generate_pkce_pair (): code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32 )).rstrip(b'=' ).decode() code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode()).digest() ).rstrip(b'=' ).decode() return code_verifier, code_challenge @app.route('/' ) def index (): return render_template('index.html' ) @app.route('/login' ) def login (): code_verifier, code_challenge = generate_pkce_pair() session['code_verifier' ] = code_verifier auth_url = ( f"{AUTH_ENDPOINT} " f"?response_type=code" f"&client_id={CLIENT_ID} " f"&redirect_uri={REDIRECT_URI} " f"&scope=openid profile email" f"&code_challenge={code_challenge} " f"&code_challenge_method=S256" ) print (auth_url) return redirect(auth_url) @app.route('/callback' ) def callback (): code = request.args.get('code' ) code_verifier = session.get('code_verifier' ) headers = {'Content-Type' : 'application/x-www-form-urlencoded' } data = { 'grant_type' : 'authorization_code' , 'client_id' : CLIENT_ID, 'client_secret' : CLIENT_SECRET, 'code' : code, 'redirect_uri' : REDIRECT_URI, 'code_verifier' : code_verifier, } token_resp = requests.post(TOKEN_ENDPOINT, data=data, headers=headers) print (token_resp) token_data = token_resp.json() print (token_data) access_token = token_data.get('access_token' ) id_token = token_data.get('id_token' ) userinfo_resp = requests.get(USERINFO_ENDPOINT, headers={ 'Authorization' : f'Bearer {access_token} ' }) return { 'Access Token' : access_token, 'ID Token' : id_token, 'UserInfo' : userinfo_resp.json() } if __name__ == '__main__' : app.run(debug=True )
如果使用 authlib 是这样子的.通过 Authlib 把应用接入 OIDC:
启动时先从环境变量读取并校验客户端 ID、密钥及各端点;
随后注册 OIDC 客户端并自动启用 PKCE。
用户访问 /login 时,服务端生成 nonce 并调用 authorize_redirect() 将浏览器跳转到身份提供方登录,同时在会话里保存随机值;身份提供方完成认证后回调到 /callback,authorize_access_token() 会携带先前的 code 和 code verifier 去换取 access token / ID token,并用保存的 nonce 校验 ID Token 防止重放。
成功后解析得到的声明(用户信息)渲染或写入 Session,即可认为用户已登录。如此利用现成库把 PKCE、状态验证、ID Token 验签等安全细节都交给框架处理,只需少量代码就实现了安全的单点登录闭环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 import os, secrets from flask import Flask, request, render_template from authlib.integrations.flask_client import OAuth LAZYCAT_BOX_DOMAIN = os.environ.get('LAZYCAT_BOX_DOMAIN') LAZYCAT_APP_DOMAIN = os.environ.get('LAZYCAT_APP_DOMAIN') CLIENT_ID = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_ID") CLIENT_SECRET = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_SECRET") AUTH_URI = os.getenv("LAZYCAT_AUTH_OIDC_AUTH_URI") TOKEN_URI = os.getenv("LAZYCAT_AUTH_OIDC_TOKEN_URI") USERINFO_URI = os.getenv("LAZYCAT_AUTH_OIDC_USERINFO_URI") JWKS_URI = os.getenv("OIDC_JWKS_URI", f"https://{LAZYCAT_BOX_DOMAIN}/sys/oauth/keys") REDIRECT_URI = os.getenv("OIDC_REDIRECT_URI", f"https://{LAZYCAT_APP_DOMAIN}/callback") required = [CLIENT_ID, CLIENT_SECRET, AUTH_URI, TOKEN_URI, USERINFO_URI, JWKS_URI, REDIRECT_URI] if not all(required): missing = [k for k, v in zip( ["OIDC_CLIENT_ID","OIDC_CLIENT_SECRET","OIDC_AUTH_URI", "OIDC_TOKEN_URI","OIDC_USERINFO_URI","OIDC_JWKS_URI", "OIDC_REDIRECT_URI"], required) if not v] raise RuntimeError(f"缺少环境变量: {', '.join(missing)}") app = Flask(__name__) app.secret_key = "a-very-secret-key" oauth = OAuth(app) oidc = oauth.register( name="casdoor", client_id=CLIENT_ID, client_secret=CLIENT_SECRET, authorize_url=AUTH_URI, access_token_url=TOKEN_URI, userinfo_endpoint=USERINFO_URI, jwks_uri=JWKS_URI, client_kwargs={"scope": "openid profile email"}, redirect_uri=REDIRECT_URI, ) @app.route("/") def index(): return render_template("index.html") @app.route("/login") def login(): nonce = secrets.token_urlsafe(16) resp = oidc.authorize_redirect(redirect_uri=REDIRECT_URI, nonce=nonce) resp.set_cookie("oidc_nonce", nonce, max_age=300, httponly=True) return resp @app.route("/callback") def callback(): token = oidc.authorize_access_token() nonce = request.cookies.get("oidc_nonce") claims = oidc.parse_id_token(token, nonce=nonce) env_vars = sorted(os.environ.items()) return render_template("callback.html", access_token=token.get("access_token"), id_token=token.get("id_token"), user_info=claims, env_vars=env_vars) @app.route("/env", endpoint="show_env_html") def show_env_html(): env_vars = sorted(os.environ.items()) return render_template("env.html", env_vars=env_vars) if __name__ == "__main__": app.run(debug=True, port=5005)