懒猫微服开发篇(五):懒猫微服如何使用 OpenID Connect (OIDC)?(下)

先决条件:

需要在lzc-manifest.yml定义 oidc_redirect_path 和 environment。

配置lzc-manifest.yml

oidc_redirect_path 就是你的应用的回调地址,只有写了这个之后才能正确使用 OpenID Connect 的环境变量。

回调地址是按照应用而定的,有的是/callback,/oidc/callback 或者/oauth/callback。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
lzc-sdk-version: 0.1
name: 懒猫ENV查看器
package: xu.deploy.env
version: 0.0.2
description:
license: https://choosealicense.com/licenses/mit/
homepage:
author: xu
application:
subdomain: env
oidc_redirect_path: /callback
routes:
- /=exec://5005,./lzcapp/pkg/content/run.sh
environment:
- LAZYCAT_AUTH_OIDC_CLIENT_ID=${LAZYCAT_AUTH_OIDC_CLIENT_ID}
- LAZYCAT_AUTH_OIDC_CLIENT_SECRET=${LAZYCAT_AUTH_OIDC_CLIENT_SECRET}
- LAZYCAT_AUTH_OIDC_AUTH_URI=${LAZYCAT_AUTH_OIDC_AUTH_URI}
- LAZYCAT_AUTH_OIDC_TOKEN_URI=${LAZYCAT_AUTH_OIDC_TOKEN_URI}
- LAZYCAT_AUTH_OIDC_USERINFO_URI=${LAZYCAT_AUTH_OIDC_USERINFO_URI}
- LAZYCAT_AUTH_OIDC_ISSUER_URI=${LAZYCAT_AUTH_OIDC_ISSUER_URI}

定义了环境变量之后,我们就可以在代码中使用环境变量:

开机时一次性从环境变量读取懒猫微服的应用域名、OIDC 客户端 ID/密钥,以及授权、令牌、用户信息三个核心端点,并根据应用域名拼出默认 Redirect URI,从而把所有与 OpenID Connect 登录相关的敏感信息解耦。

这里的 callback 是应用的回调 URL,需要根据应用调整。

1
2
3
4
5
6
7
8
9
LAZYCAT_BOX_DOMAIN = os.environ.get('LAZYCAT_BOX_DOMAIN')
LAZYCAT_APP_DOMAIN = os.environ.get('LAZYCAT_APP_DOMAIN')

CLIENT_ID = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_ID")
CLIENT_SECRET = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_SECRET")
AUTH_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_AUTH_URI")
TOKEN_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_TOKEN_URI")
USERINFO_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_USERINFO_URI")
REDIRECT_URI = os.getenv("OIDC_REDIRECT_URI", f"https://{LAZYCAT_APP_DOMAIN}/callback")

登录功能

在用户访问 /login 时动态生成一对 PKCE 凭据(随机 code verifier 和其 SHA-256 派生的 code challenge),把 verifier 暂存进会话,再携带 challenge 等参数构造 OIDC 授权码请求,并将用户浏览器重定向到身份提供方完成安全登录;回调阶段可用 session 中的 code verifier 与返回的 code exchange 配合,防止授权码被劫持或重放,从而提升 OAuth 2.0/OIDC 的安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# ======= 生成 PKCE Code Verifier & Challenge =======
def generate_pkce_pair():
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()
return code_verifier, code_challenge

@app.route('/login')
def login():
code_verifier, code_challenge = generate_pkce_pair()
session['code_verifier'] = code_verifier

auth_url = (
f"{AUTH_ENDPOINT}"
f"?response_type=code"
f"&client_id={CLIENT_ID}"
f"&redirect_uri={REDIRECT_URI}"
f"&scope=openid profile email"
f"&code_challenge={code_challenge}"
f"&code_challenge_method=S256"
)
print(auth_url)
return redirect(auth_url)
  1. generate_pkce_pair()
    • 使用 secrets.token_bytes(32) 随机生成 32 字节高强度随机数。
    • 先经 base64.urlsafe_b64encode 再去掉尾部 = 得到 code verifier
    • 对 code verifier 做 SHA-256 散列后再次 base64 URL 安全编码并去掉 =,得到 code challenge
    • 返回二元组 (code_verifier, code_challenge)
  2. /login 路由
    • 调用 generate_pkce_pair() 生成并拿到 code_verifiercode_challenge
    • code_verifier 写入 Flask session,以便稍后在回调时校验。
    • 拼接授权端点 AUTH_ENDPOINT 形成认证 URL:
      • response_type=code 采用授权码模式
      • client_idredirect_uriscope 等常规 OIDC 参数
      • code_challengecode_challenge_method=S256 声明使用 PKCE(S256)
    • redirect(auth_url) 将浏览器跳转到身份提供方进行登录 + 授权

回调地址

/callback 处理函数先从回调参数取出授权码,再用会话里的 code verifier 按 PKCE + 授权码模式向令牌端点换取 access token 和 ID token;成功后用 access token 调 /userinfo 获取用户资料,并把三者一起返回。如此既完成了 OAuth 2.0 的安全换码,又拿到了 OIDC 提供的登录身份信息,实现前后端分离的单点登录闭环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@app.route('/callback')
def callback():
code = request.args.get('code')
code_verifier = session.get('code_verifier')

# 请求 access token
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {
'grant_type': 'authorization_code',
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'code': code,
'redirect_uri': REDIRECT_URI,
'code_verifier': code_verifier,
}

token_resp = requests.post(TOKEN_ENDPOINT, data=data, headers=headers)
print(token_resp)
token_data = token_resp.json()
print(token_data)

access_token = token_data.get('access_token')
id_token = token_data.get('id_token')

# 获取用户信息
userinfo_resp = requests.get(USERINFO_ENDPOINT, headers={
'Authorization': f'Bearer {access_token}'
})

return {
'Access Token': access_token,
'ID Token': id_token,
'UserInfo': userinfo_resp.json()
}
  1. code = request.args.get('code')

    • 从回调 URL 查询参数中取出授权服务器返回的 code(授权码)。
  2. code_verifier = session.get('code_verifier')

    • 读取先前 /login 时存进会话的 code verifier,准备用于 PKCE 校验。
  3. 准备换取令牌的 HTTP POST 请求

    1
    2
    3
    4
    5
    6
    7
    8
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    data = {
    'grant_type': 'authorization_code', # 授权码模式
    'client_id': CLIENT_ID,
    'code': code, # 回调拿到的授权码
    'redirect_uri': REDIRECT_URI, # 必须与首跳一致
    'code_verifier': code_verifier, # PKCE 关键参数
    }
    • code_verifier 会被身份提供方与首跳收到的 code_challenge 做 SHA-256 对比,从而证明客户端的“持有者”身份,防止授权码被截获后被第三方滥用。
  4. token_resp = requests.post(TOKEN_ENDPOINT, data=data, headers=headers)

    • 向令牌端点发送表单数据以换取 Access Token / ID Token
  5. token_data = token_resp.json()

    • 解析 JSON 响应。例如:

      1
      2
      3
      4
      5
      6
      7
      {
      "access_token": "...",
      "id_token": "...",
      "expires_in": 3600,
      "token_type": "Bearer",
      ...
      }
  6. access_token = token_data.get('access_token')

    • 读取访问令牌,用于调用受保护 API。
      id_token = token_data.get('id_token')
    • 读取 OIDC ID Token,携带用户身份声明,可本地解码验证。
  7. 获取用户信息

    1
    2
    3
    4
    userinfo_resp = requests.get(
    USERINFO_ENDPOINT,
    headers={'Authorization': f'Bearer {access_token}'}
    )
    • 按 OIDC 规范,用 Bearer Token 调 /userinfo 端点,拿到 JSON 形式的用户信息。
  8. 返回聚合结果(此处直接返回给浏览器以便演示)

    1
    2
    3
    4
    5
    return {
    'Access Token': access_token,
    'ID Token': id_token,
    'UserInfo': userinfo_resp.json()
    }

在浏览器中可以看到这个登录跳转:

image-20250705095459340

还是这个图,我们继续看这个流程:

7a6f946c-b299-410b-8286-2958d488caa2

拿到 code 之后可以就可以换到 Access token 和 ID Token 了,这个 code 只有一次有效,可以达到防重放的效果。当然这个只是 OIDC 的一个例子,在生产环境的 APP 中还需要做路由守卫以及 access 续签的操作。

image-20250705100147125

完整代码如下:
  1. 读取配置
    • 从环境变量获取 Lazycat 平台的域名、OIDC 客户端 ID/密钥,以及授权端点、令牌端点、用户信息端点和回调地址。
    • 用随机 app.secret_key 支持 Flask Session。
  2. PKCE 安全增强
    • generate_pkce_pair() 动态生成 code_verifier / code_challenge ;后者随登录请求携带,前者保存在 Session,回调时再带给 Token 端点,防止授权码被劫持。
  3. 核心路由
    • /:渲染首页(需自备 index.html)。
    • /login
      1. 生成 PKCE 对;
      2. 拼接授权 URL(response_type=code,scope 含 openid profile email);
      3. 浏览器重定向到 IdP 登录/授权页面。
    • /callback
      1. 取回 codecode_verifier
      2. POST 到 TOKEN_ENDPOINT 换取 access_tokenid_token
      3. access_token 调用 USERINFO_ENDPOINT 拿到用户信息;
      4. 以 JSON 形式返回令牌与用户资料。

注意:生产环境应关闭 debug=True、使用 HTTPS、校验 state 参数防 CSRF,并妥善处理 Token 异常和错误分支。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import base64
import hashlib
import secrets
import requests
from flask import Flask, redirect, request, session, url_for, render_template

app = Flask(__name__)
app.secret_key = os.urandom(24)

LAZYCAT_BOX_DOMAIN = os.environ.get('LAZYCAT_BOX_DOMAIN')
LAZYCAT_APP_DOMAIN = os.environ.get('LAZYCAT_APP_DOMAIN')

CLIENT_ID = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_ID")
CLIENT_SECRET = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_SECRET")
AUTH_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_AUTH_URI")
TOKEN_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_TOKEN_URI")
USERINFO_ENDPOINT = os.getenv("LAZYCAT_AUTH_OIDC_USERINFO_URI")
REDIRECT_URI = os.getenv("OIDC_REDIRECT_URI", f"https://{LAZYCAT_APP_DOMAIN}/callback")


print(CLIENT_ID)
print(CLIENT_SECRET)
print(AUTH_ENDPOINT)
print(TOKEN_ENDPOINT)
print(USERINFO_ENDPOINT)

print(REDIRECT_URI)

# ======= 生成 PKCE Code Verifier & Challenge =======
def generate_pkce_pair():
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()
return code_verifier, code_challenge

# ======= 首页 =======
@app.route('/')
def index():
return render_template('index.html')

# ======= 跳转授权页 =======
@app.route('/login')
def login():
code_verifier, code_challenge = generate_pkce_pair()
session['code_verifier'] = code_verifier

auth_url = (
f"{AUTH_ENDPOINT}"
f"?response_type=code"
f"&client_id={CLIENT_ID}"
f"&redirect_uri={REDIRECT_URI}"
f"&scope=openid profile email"
f"&code_challenge={code_challenge}"
f"&code_challenge_method=S256"
)
print(auth_url)
return redirect(auth_url)

# ======= 回调处理 =======
@app.route('/callback')
def callback():
code = request.args.get('code')
code_verifier = session.get('code_verifier')

# 请求 access token
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {
'grant_type': 'authorization_code',
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'code': code,
'redirect_uri': REDIRECT_URI,
'code_verifier': code_verifier,
}

token_resp = requests.post(TOKEN_ENDPOINT, data=data, headers=headers)
print(token_resp)
token_data = token_resp.json()
print(token_data)

access_token = token_data.get('access_token')
id_token = token_data.get('id_token')

# 获取用户信息
userinfo_resp = requests.get(USERINFO_ENDPOINT, headers={
'Authorization': f'Bearer {access_token}'
})

return {
'Access Token': access_token,
'ID Token': id_token,
'UserInfo': userinfo_resp.json()
}

if __name__ == '__main__':
app.run(debug=True)

如果使用 authlib 是这样子的.通过 Authlib 把应用接入 OIDC:

启动时先从环境变量读取并校验客户端 ID、密钥及各端点;

随后注册 OIDC 客户端并自动启用 PKCE。

用户访问 /login 时,服务端生成 nonce 并调用 authorize_redirect() 将浏览器跳转到身份提供方登录,同时在会话里保存随机值;身份提供方完成认证后回调到 /callbackauthorize_access_token() 会携带先前的 code 和 code verifier 去换取 access token / ID token,并用保存的 nonce 校验 ID Token 防止重放。

成功后解析得到的声明(用户信息)渲染或写入 Session,即可认为用户已登录。如此利用现成库把 PKCE、状态验证、ID Token 验签等安全细节都交给框架处理,只需少量代码就实现了安全的单点登录闭环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os, secrets
from flask import Flask, request, render_template
from authlib.integrations.flask_client import OAuth

LAZYCAT_BOX_DOMAIN = os.environ.get('LAZYCAT_BOX_DOMAIN')
LAZYCAT_APP_DOMAIN = os.environ.get('LAZYCAT_APP_DOMAIN')

CLIENT_ID = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_ID")
CLIENT_SECRET = os.getenv("LAZYCAT_AUTH_OIDC_CLIENT_SECRET")
AUTH_URI = os.getenv("LAZYCAT_AUTH_OIDC_AUTH_URI")
TOKEN_URI = os.getenv("LAZYCAT_AUTH_OIDC_TOKEN_URI")
USERINFO_URI = os.getenv("LAZYCAT_AUTH_OIDC_USERINFO_URI")
JWKS_URI = os.getenv("OIDC_JWKS_URI", f"https://{LAZYCAT_BOX_DOMAIN}/sys/oauth/keys")
REDIRECT_URI = os.getenv("OIDC_REDIRECT_URI", f"https://{LAZYCAT_APP_DOMAIN}/callback")

required = [CLIENT_ID, CLIENT_SECRET, AUTH_URI, TOKEN_URI, USERINFO_URI, JWKS_URI, REDIRECT_URI]
if not all(required):
missing = [k for k, v in zip(
["OIDC_CLIENT_ID","OIDC_CLIENT_SECRET","OIDC_AUTH_URI",
"OIDC_TOKEN_URI","OIDC_USERINFO_URI","OIDC_JWKS_URI",
"OIDC_REDIRECT_URI"], required) if not v]
raise RuntimeError(f"缺少环境变量: {', '.join(missing)}")

app = Flask(__name__)
app.secret_key = "a-very-secret-key"

oauth = OAuth(app)
oidc = oauth.register(
name="casdoor",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
authorize_url=AUTH_URI,
access_token_url=TOKEN_URI,
userinfo_endpoint=USERINFO_URI,
jwks_uri=JWKS_URI,
client_kwargs={"scope": "openid profile email"},
redirect_uri=REDIRECT_URI,
)

@app.route("/")
def index():
return render_template("index.html")

@app.route("/login")
def login():
nonce = secrets.token_urlsafe(16)
resp = oidc.authorize_redirect(redirect_uri=REDIRECT_URI, nonce=nonce)
resp.set_cookie("oidc_nonce", nonce, max_age=300, httponly=True)
return resp

@app.route("/callback")
def callback():
token = oidc.authorize_access_token()
nonce = request.cookies.get("oidc_nonce")
claims = oidc.parse_id_token(token, nonce=nonce)
env_vars = sorted(os.environ.items())

return render_template("callback.html",
access_token=token.get("access_token"),
id_token=token.get("id_token"),
user_info=claims,
env_vars=env_vars)

@app.route("/env", endpoint="show_env_html")
def show_env_html():
env_vars = sorted(os.environ.items())
return render_template("env.html", env_vars=env_vars)

if __name__ == "__main__":
app.run(debug=True, port=5005)

懒猫微服开发篇(五):懒猫微服如何使用 OpenID Connect (OIDC)?(下)

https://xu-hardy.github.io/懒猫微服开发篇(五):懒猫微服如何使用-openid-connect-(oidc)?(下)/

作者

Xu

发布于

2025-07-05

更新于

2025-07-05

许可协议

评论