搭建自己的 SSO 服务器 – Part 2: 实现长期登录(Refresh Token)

引言

第一部分 中,我们成功实现了 SSO 服务器的基本登录和认证功能,允许用户通过用户名和密码获取 Access Token,并使用该 Token 访问受保护的资源。然而,目前的 Access Token 有效期仅为 1 小时,一旦过期,用户必须重新登录。

为了提升用户体验和安全性,我们将在本部分实现 Refresh Token 机制,允许用户在 Access Token 过期后,无需重新输入用户名和密码即可获取新的 Token。同时,我们还将支持 Refresh Token 的吊销(Revoke),确保用户可以安全地登出或管理员可以手动禁用某些会话。


现有 Token 机制的问题

目前,Access Token 有效期设置为 1 小时,但存在以下问题:

  1. 用户需要频繁登录,体验较差。
  2. 无法手动撤销 Access Token,一旦泄露,只能等它自然过期。
  3. 缺乏长期会话管理,当前 Token 机制并不能维持长期会话状态。

为了解决这些问题,我们引入 Refresh Token 机制,它主要解决了 第 1 个问题——减少用户频繁登录的需求。对于 Token 撤销(第 2 点),仅仅引入 Refresh Token 并不能直接解决,仍然需要配合应用自身的逻辑来管理 Token 的生效状态。会话管理(第 3 点) 也是一个更广泛的问题,Refresh Token 只是会话管理的一部分,完整的会话管理通常涉及更多机制,如设备管理、多重身份验证等。


数据库扩展

为了支持 Refresh Token,我们需要在数据库中新增 refresh_tokens 表,存储 Refresh Token 相关信息:

CREATE TABLE refresh_tokens (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id INT NOT NULL,
    token VARCHAR(64) UNIQUE NOT NULL,  -- 存储 64 位随机 refresh token
    expiry DATETIME NOT NULL,           -- 过期时间
    revoked BOOLEAN DEFAULT FALSE,      -- 是否被撤销
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);

字段解析:

  • user_id:关联 users 表,表示该 Refresh Token 属于哪个用户。
  • token:存储 Refresh Token 本身。
  • created_at:记录 Refresh Token 创建时间。
  • revoked:标记该 Refresh Token 是否被吊销,默认 FALSE(未吊销)。

实现 Refresh Token 机制

1. 登录时存储 Refresh Token

在用户登录成功后,我们生成 Access Token(1 小时有效)和 Refresh Token(7 天有效),并将 Refresh Token 存入数据库。

2. 使用 Refresh Token 获取新的 Access Token

用户使用 Refresh Token 申请新的 Access Token 时,我们需要检查该 Token 是否存在且未被吊销。

@app.route('/login', methods=['POST'])
def token():
    data = request.json
    grant_type = data.get("grant_type")

    # --- 登录获取 token ---
    if grant_type == "password":
        username = data.get("username")
        password = data.get("password")

        cursor.execute("SELECT id, password_hash FROM users WHERE username = %s", (username,))
        user = cursor.fetchone()

        if user and bcrypt.checkpw(password.encode(), user[1].encode()):
            user_id = user[0]

            access_token = generate_access_token(user_id)
            refresh_token = generate_refresh_token(user_id)

            return jsonify({"access_token": access_token, "refresh_token": refresh_token})

        return jsonify({"error": "Invalid username or password"}), 401

    # --- 使用 refresh_token 刷新 access_token ---
    elif grant_type == "refresh_token":
        refresh_token = data.get("refresh_token")

        if not refresh_token:
            return jsonify({"error": "Refresh token is required"}), 400

        cursor.execute("SELECT user_id, expiry, revoked FROM refresh_tokens WHERE token = %s", (refresh_token,))
        token_record = cursor.fetchone()

        if not token_record:
            return jsonify({"error": "Invalid refresh token"}), 401

        user_id, expiry, revoked = token_record

        if revoked:
            return jsonify({"error": "Refresh token has been revoked"}), 401

        if datetime.datetime.utcnow() > expiry:
            return jsonify({"error": "Refresh token has expired"}), 401

        new_access_token = generate_access_token(user_id)
        return jsonify({"access_token": new_access_token})

    return jsonify({"error": "Invalid grant_type"}), 400

# 生成 access token
def generate_access_token(user_id):
    payload = {
        "user_id": user_id,
        "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=app.config["ACCESS_TOKEN_EXPIRY_HOURS"])
    }
    return jwt.encode(payload, app.config["SECRET_KEY"], algorithm="HS256")

# 生成 refresh token(非 JWT)
def generate_refresh_token(user_id):
    refresh_token = secrets.token_hex(32)
    expiry_time = datetime.datetime.utcnow() + datetime.timedelta(days=app.config["REFRESH_TOKEN_EXPIRY_DAYS"])
    cursor.execute("INSERT INTO refresh_tokens (user_id, token, expiry, revoked) VALUES (%s, %s, %s, %s)", 
                   (user_id, refresh_token, expiry_time, False))
    db.commit()
    return refresh_token

新旧代码对比

改动点旧代码新代码
接口路径/login/token
请求参数usernamepasswordgrant_type,支持 "password""refresh_token"
返回数据access_token (JWT)access_token (JWT) + refresh_token (非 JWT)
refresh_token 处理无 refresh_tokenrefresh_token 存数据库,可刷新 access_token
access_token 生成JWT,带 "username"JWT,仅存 "user_id"
refresh_token 结构生成 非 JWT 随机字符串,存数据库
数据库表结构仅查询 users额外维护 refresh_tokens,支持 token 撤销和过期
错误处理仅处理 登录失败额外处理 refresh_token 过期/撤销

具体改动点

1. 改用 /token 端点,支持 grant_type

旧代码 只支持 用户名密码登录

@app.route('/login', methods=['POST'])

新代码 统一用 /token 处理两种情况:

@app.route('/token', methods=['POST'])
  • "password" 模式:用户名 + 密码登录
  • "refresh_token" 模式:使用 refresh_token 刷新 access_token

2. 生成 refresh_token 并存数据库

旧代码 没有 refresh_token

token_payload = {
    "user_id": user[0],
    "username": username,
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(token_payload, app.config["SECRET_KEY"], algorithm="HS256")
return jsonify({"access_token": token})

新代码 增加 refresh_token 逻辑:

refresh_token = secrets.token_hex(32)  # 生成随机字符串
expiry_time = datetime.datetime.utcnow() + datetime.timedelta(days=7)
cursor.execute("INSERT INTO refresh_tokens (user_id, token, expiry, revoked) VALUES (%s, %s, %s, %s)", 
               (user_id, refresh_token, expiry_time, False))
db.commit()

return jsonify({"access_token": access_token, "refresh_token": refresh_token})

3. access_token 精简,仅存 user_id

旧代码 额外存储 "username"

token_payload = {
    "user_id": user[0],
    "username": username,
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}

新代码 只存 user_id,减少 JWT 体积:

token_payload = {
    "user_id": user_id,
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}

4. 处理 refresh_token 逻辑

旧代码 没有 refresh_token,无法续期 access_token
新代码 处理 grant_type="refresh_token"

elif grant_type == "refresh_token":
    refresh_token = data.get("refresh_token")

    cursor.execute("SELECT user_id, expiry, revoked FROM refresh_tokens WHERE token = %s", (refresh_token,))
    token_record = cursor.fetchone()

    if not token_record:
        return jsonify({"error": "Invalid refresh token"}), 401
    if token_record[2]:  # revoked
        return jsonify({"error": "Refresh token has been revoked"}), 401
    if datetime.datetime.utcnow() > token_record[1]:  # expired
        return jsonify({"error": "Refresh token has expired"}), 401

    new_access_token = generate_access_token(token_record[0])
    return jsonify({"access_token": new_access_token})

5. 旧数据库结构调整

users

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL
);

refresh_tokens

CREATE TABLE refresh_tokens (
    id INT AUTO_INCREMENT PRIMARY KEY,
    user_id INT NOT NULL,
    token VARCHAR(64) UNIQUE NOT NULL,
    expiry DATETIME NOT NULL,
    revoked BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
变更点旧代码新代码
登录接口/login/token
支持 refresh_token
数据库存储 refresh_token
JWT 体积优化❌ 存 username✅ 仅存 user_id
统一 /token 端口❌ 仅处理登录✅ 处理登录 & 刷新 token

3. 吊销 Refresh Token

提供一个 API 允许用户或管理员主动吊销 Refresh Token,防止被滥用。

@app.route('/revoke', methods=['POST'])
def revoke_token():
    data = request.json
    refresh_token = data.get("refresh_token")

    if not refresh_token:
        return jsonify({"error": "Refresh token is required"}), 400

    cursor.execute("UPDATE refresh_tokens SET revoked = TRUE WHERE token = %s", (refresh_token,))
    db.commit()

    return jsonify({"message": "Refresh token has been revoked"})

接口测试

用户注册

用户登录 – 密码grant_type=password):

用户登录 – 刷新令牌grant_type=refresh_token):

MySQL 表中的 Refresh Token 状态:

传递已过期 Refresh Token 时,接口返回结果:

❓ FAQ-1. Refresh Token 没绑定用户是否安全?行业内标准是怎样的?

问题说明:
当前代码中,refresh_token 是通过数据库与 user_id 关联的,但当客户端传来一个合法的 refresh_token 时,并没有进行额外的用户身份验证。换句话说,只要拿到 refresh_token,就可以换取新的 access_token,无视是谁的请求。

风险点:
• 如果 refresh token 泄露,他人可以在不知情的情况下换取用户的 access token。
• 没有绑定客户端、IP、UA 等额外上下文,无法防止 token 被跨环境使用。

行业标准:
OAuth2 标准并不强制要求 refresh_token 验证用户身份,但推荐与客户端或上下文绑定,以提高安全性。
OpenID Connect 的实现中,通常 refresh token 与 client_id、user_id 绑定,并可配置是否允许跨客户端使用。

❓ FAQ-2. 为什么 user_id 应该使用 GUID 而不是自增 ID?

问题说明:
当前 users 表的主键 id 为自增整数,refresh_tokens 表中的 user_id 外键指向它。我计划将主键改成 user_id(UUID / GUID),以提升可读性和一致性。

自增 ID 的缺点:
• 可预测:攻击者可通过遍历猜测 id=1,2,3… 访问其他用户的数据。
• 数据迁移难:在多节点数据库中会发生自增冲突。
• 安全性较低:不适合暴露在 URL 或外部 API 中。

GUID 的优势:

• 全局唯一,适合分布式系统。
• 更难被猜测,增强安全性。
• 更利于未来扩展到 RBAC、IAM 结构时的跨表联合。

吊销刷新令牌:

MySQL 表中的 Refresh Token 状态:

此时如果尝试使用 Refresh Token 兑换 Access Token 会报错提示刷新令牌已被吊销

❓ FAQ-3. Refresh Token 的过期与吊销,前端需要知道区别吗?

问题说明:
当前 refresh token 可为 “已过期” 或 “被吊销” 两种状态。接口返回提示中到底需不需要区分这两种状态。

结论:
对于客户端来说,这两种状态本质上都是 “token 无效”,返回一个统一的错误提示就足够了。


总结

在本部分中,我们:

  • 引入 Refresh Token 机制,减少用户频繁登录。
  • 存储 Refresh Token,支持多设备管理。
  • 实现 Refresh Token 吊销,提高安全性。

需要注意的是,Refresh Token 并不能直接解决 Token 撤销和完整的会话管理问题,这些仍然需要应用自身的逻辑来配合实现。