搭建自己的 SSO 服务器 – Part 1: 实现登录 + 认证 API
引言
在数字化时代,随着企业和应用程序的增多,如何管理用户身份和授权访问已经成为一个亟待解决的重要问题。传统的身份验证机制存在一定的局限性,无法满足现代应用程序对安全、灵活性和可扩展性的需求。因此, 身份与访问管理(IAM) 成为现代系统架构的核心部分,它不仅涉及用户身份的管理,还包括对应用和资源的访问控制。
在这一系列博客中,我将带你一起研究如何构建一个 SSO(Single Sign-On) 系统,这不仅仅是一个简单的身份验证功能,更是一个强大的 身份管理(IAM) 系统的基础。通过构建自己的 SSO 服务器,我们可以深入理解以下几个关键概念:
☑️ SSO:让用户通过一次登录实现对多个应用系统的访问。
☑️ IAM:提供一种集中管理和控制身份认证、授权和审计的机制。
☑️ RBAC(基于角色的访问控制):根据用户的角色和权限管理他们对资源的访问控制,确保系统中的每个用户都能按照授权的权限访问相应的资源。
在本系列的第一部分中,我们将实现一个基础的 SSO 登录 + 认证 API,为构建一个高效、安全的身份管理系统奠定基础。随着系列的深入,我们会逐步扩展功能,探讨如何实现复杂的身份验证机制、角色管理、权限控制等 IAM 和 RBAC 相关的功能,帮助你全面了解一个现代企业级应用系统的身份管理架构。
在本篇中,我们将:
- 使用 Flask 搭建后端服务器。
- 实现 用户注册 和 登录 API。
- 使用 JWT(JSON Web Token) 进行身份认证。
- 实现 基于 Token 的认证 来验证用户身份。
让我们一步步来看如何实现这一过程。
使用的技术
- Flask:轻量级的 Python Web 框架,适合用于快速构建 RESTful API。
- MySQL:流行的关系型数据库管理系统,用于存储用户数据。
- bcrypt:一种安全的密码哈希算法,用于保护用户的密码。
- JWT:一种紧凑的 URL 安全方式来表示两方之间传输的声明。
- Logging:日志追踪,用于调试和错误追踪。
Python 依赖安装
pip install flask mysql-connector-python bcrypt pyjwt
步骤1 – 创建数据库与表
在实现 SSO 系统之前,我们首先需要创建一个 MySQL 数据库来存储用户信息。以下是创建数据库和表的步骤:
1. 创建数据库
首先,在 MySQL 中创建一个数据库来存储用户数据:
CREATE DATABASE oauth2_db;
2. 创建用户表
接着,创建一个 users
表来存储用户的基本信息和哈希密码。这个表将包含两个字段:id
和 password_hash
。id
是主键,password_hash
用于存储用户的密码哈希。
CREATE DATABASE oauth2_db; USE oauth2_db; CREATE TABLE users ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(255) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

步骤2 – 搭建 Flask 应用
我们首先搭建一个 Flask 应用,这个应用将作为我们的 SSO 服务器后端。我们将创建三个主要的接口:
☑️ /register – 用于用户注册,用户提供用户名和密码。
☑️ /login – 用于用户登录并生成 JWT Token。
☑️ /userinfo – 基于提供的 JWT Token 认证用户并返回用户信息。
接下来,我们来逐步分析每一部分的代码。
1. 用户注册
/register 接口负责用户注册。它接收一个包含用户名和密码的 POST 请求。密码将使用 bcrypt 算法进行哈希处理,然后将用户信息存储到数据库中。
# 1️⃣ User registration @app.route('/register', methods=['POST']) def register(): logger.debug("Request received at /register endpoint") # Get request data data = request.json logger.debug(f"Request Data: {data}") username = data.get("username") password = data.get("password") if not username or not password: logger.debug('Username or password is missing') return jsonify({"error": "Username and password cannot be empty"}), 400 # Hash the password hashed_password = bcrypt.hashpw(password.encode(), bcrypt.gensalt()) logger.debug(f"Hashed password: {hashed_password}") try: # Insert data into the database cursor.execute("INSERT INTO users (username, password_hash) VALUES (%s, %s)", (username, hashed_password)) db.commit() logger.debug('User registered successfully') return jsonify({"message": "Registration successful"}), 201 except mysql.connector.errors.IntegrityError as e: logger.error(f"Error: {e}") return jsonify({"error": "Username already exists"}), 400 except mysql.connector.Error as err: logger.error(f"Database error: {err}") return jsonify({"error": "Database error"}), 500
关键步骤:
- 检查用户名和密码是否为空。
- 使用 bcrypt 对密码进行哈希。
- 将用户信息插入到数据库中。
❓ FAQ-1. 为什么密码需要以哈希的形式存储?
1. 不可逆性:如果数据库只存明文密码,一旦泄露,攻击者可以直接使用。使用哈希存储,攻击者即使拿到数据库,也无法直接获取密码,必须进行计算破解(如暴力破解或字典攻击)。
2. 最小化泄露影响:如果某个用户的密码泄露,攻击者无法简单地用它去尝试登录其他系统(防止密码重用风险)。如果数据库存的是哈希值,即使某个用户的哈希被破解,其他用户的哈希值仍然安全。
❓ FAQ-2. 使用哈希的形式存储密码就安全了吗?
答案是NO。哈希存储仍然面临 彩虹表攻击(预计算哈希值的查表攻击)。
1. 彩虹表攻击的原理:攻击者可以预计算一组常见密码的哈希值,并存入彩虹表。如果数据库存的是没有加盐的哈希,攻击者可以直接查表找到对应的原始密码。
2. 加盐的作用:盐是一个随机生成的字符串,每个用户的密码都会带上一个独特的盐值后再进行哈希计算。这样,即使两个用户使用相同的密码,它们的哈希值也会不同。这使得彩虹表失效,因为攻击者无法预计算所有可能的盐值组合。
❓ FAQ-3. 为什么我们使用 bcrypt 库?
bcrypt 是专门为密码存储设计的哈希算法,相比通用哈希函数(如 SHA-256),它在安全性上有以下优势:
1. 内置 Salt,防止彩虹表攻击:bcrypt 自动生成并存储 Salt,开发者无需手动处理,避免了因 Salt 处理不当导致的安全漏洞。
2. 适应性强(Work Factor,可调成本):bcrypt 提供了可配置的计算成本(Work Factor / 轮数),即使计算能力提升,仍然可以增加计算复杂度,增强抗暴力破解能力。bcrypt 的工作因子可以使计算比 SHA-256 之类的哈希慢很多,减缓暴力破解速度。
3. 设计上防御 GPU/ASIC 加速破解:现代破解手段主要利用 GPU/ASIC 进行大规模并行计算,例如暴力破解普通哈希(如 SHA-256)的速度远快于 CPU。 bcrypt 故意设计成计算密集型,并依赖 CPU 资源,使得 GPU/ASIC 失去并行计算优势,提高破解成本。
4. 其他推荐的密码哈希库:argon2 是 2015 年密码哈希竞赛的冠军,设计上比 bcrypt 更先进,安全性更高。 它优化了抗 GPU 攻击能力,并增加了内存硬性要求,让破解成本更高。 Python 可以使用 argon2-cffi 库。
2. 用户登录与 Token 生成
/login 接口允许用户使用用户名和密码登录。如果验证成功,会生成一个 JWT Token,并将其返回给客户端。
# 2️⃣ User login (returns JWT Token) @app.route('/login', methods=['POST']) def login(): logger.debug('Received request at /login endpoint') # Get request data data = request.json logger.debug(f"Request Data: {data}") username = data.get("username") password = data.get("password") cursor.execute("SELECT id, password_hash FROM users WHERE username = %s", (username,)) user = cursor.fetchone() if user and bcrypt.checkpw(password.encode(), user[1].encode()): # Generate JWT Token token_payload = { "user_id": user[0], "username": username, "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1) # 1 hour validity } token = jwt.encode(token_payload, app.config["SECRET_KEY"], algorithm="HS256") logger.debug(f"Generated JWT Token: {token}") return jsonify({"access_token": token}) logger.debug('Invalid username or password') return jsonify({"error": "Invalid username or password"}), 401
关键步骤:
- 从数据库中获取用户信息。
- 使用 bcrypt 验证密码是否匹配。
- 生成 JWT Token 并返回。
3. 用户认证
/userinfo 接口用于通过 JWT Token 来验证用户身份。客户端需要在 Authorization 头中提供 Bearer Token。如果 Token 验证成功,返回用户信息。
# 3️⃣ Authenticate JWT Token (get user information) @app.route('/userinfo', methods=['GET']) def userinfo(): logger.debug('Received request at /userinfo endpoint') token = request.headers.get("Authorization") if not token: logger.debug('No token provided') return jsonify({"error": "Token not provided"}), 401 try: token = token.split("Bearer ")[-1] payload = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"]) logger.debug(f"Token payload: {payload}") return jsonify({"user_id": payload["user_id"], "username": payload["username"]}) except jwt.ExpiredSignatureError: logger.error('Token has expired') return jsonify({"error": "Token has expired"}), 401 except jwt.InvalidTokenError: logger.error('Invalid token') return jsonify({"error": "Invalid token"}), 401
关键步骤:
- 从 Authorization 头中提取 Token。
- 解码并验证 Token 的有效性。
- 返回用户信息,如果 Token 无效或过期,返回相应的错误信息。
接口测试
实现了这些接口后,我们可以使用 curl 或 Postman 来测试这些接口。
用户注册:
curl -X POST http://127.0.0.1:5001/register -H "Content-Type: application/json" -d '{"username": "testuser", "password": "password123"}'

用户登录:
curl -X POST http://127.0.0.1:5001/login -H "Content-Type: application/json" -d '{"username": "testuser", "password": "password123"}'

返回字段 access_token 就是我们认证服务器生产的 JWT Token,解密此 token 可以在 payload 中看到认证的用户信息

获取用户信息(需要 JWT Token):
curl -X GET http://127.0.0.1:5001/userinfo -H "Authorization: Bearer <your_jwt_token>"

总结
在本篇博客中,我们成功地实现了一个简单的 SSO 服务器的基础功能,包括用户注册、登录以及基于 JWT 的认证。我们使用了 Flask 框架搭建了后端应用,使用 bcrypt 对密码进行了安全的存储和验证。
下一部分我们将继续扩展这个系统,加入更多的功能,比如 Token 过期处理、刷新 Token 等。