搭建自己的 SSO 服务器 – Part 1: 实现登录 + 认证 API

引言

在数字化时代,随着企业和应用程序的增多,如何管理用户身份和授权访问已经成为一个亟待解决的重要问题。传统的身份验证机制存在一定的局限性,无法满足现代应用程序对安全、灵活性和可扩展性的需求。因此, 身份与访问管理(IAM) 成为现代系统架构的核心部分,它不仅涉及用户身份的管理,还包括对应用和资源的访问控制。

在这一系列博客中,我将带你一起研究如何构建一个 SSO(Single Sign-On) 系统,这不仅仅是一个简单的身份验证功能,更是一个强大的 身份管理(IAM) 系统的基础。通过构建自己的 SSO 服务器,我们可以深入理解以下几个关键概念:

☑️ SSO:让用户通过一次登录实现对多个应用系统的访问。

☑️ IAM:提供一种集中管理和控制身份认证、授权和审计的机制。

☑️ RBAC(基于角色的访问控制):根据用户的角色和权限管理他们对资源的访问控制,确保系统中的每个用户都能按照授权的权限访问相应的资源。



在本系列的第一部分中,我们将实现一个基础的 SSO 登录 + 认证 API,为构建一个高效、安全的身份管理系统奠定基础。随着系列的深入,我们会逐步扩展功能,探讨如何实现复杂的身份验证机制、角色管理、权限控制等 IAM 和 RBAC 相关的功能,帮助你全面了解一个现代企业级应用系统的身份管理架构。

在本篇中,我们将:

  • 使用 Flask 搭建后端服务器。
  • 实现 用户注册登录 API。
  • 使用 JWT(JSON Web Token) 进行身份认证。
  • 实现 基于 Token 的认证 来验证用户身份。

让我们一步步来看如何实现这一过程。

使用的技术

  • Flask:轻量级的 Python Web 框架,适合用于快速构建 RESTful API。
  • MySQL:流行的关系型数据库管理系统,用于存储用户数据。
  • bcrypt:一种安全的密码哈希算法,用于保护用户的密码。
  • JWT:一种紧凑的 URL 安全方式来表示两方之间传输的声明。
  • Logging:日志追踪,用于调试和错误追踪。

Python 依赖安装

pip install flask mysql-connector-python bcrypt pyjwt

步骤1 – 创建数据库与表

在实现 SSO 系统之前,我们首先需要创建一个 MySQL 数据库来存储用户信息。以下是创建数据库和表的步骤:

1. 创建数据库

首先,在 MySQL 中创建一个数据库来存储用户数据:

CREATE DATABASE oauth2_db;

2. 创建用户表

接着,创建一个 users 表来存储用户的基本信息和哈希密码。这个表将包含两个字段:idpassword_hashid 是主键,password_hash 用于存储用户的密码哈希。

CREATE DATABASE oauth2_db;

USE oauth2_db;

CREATE TABLE users (
    id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

步骤2 – 搭建 Flask 应用

我们首先搭建一个 Flask 应用,这个应用将作为我们的 SSO 服务器后端。我们将创建三个主要的接口:

☑️ /register – 用于用户注册,用户提供用户名和密码。

☑️ /login – 用于用户登录并生成 JWT Token。

☑️ /userinfo – 基于提供的 JWT Token 认证用户并返回用户信息。

接下来,我们来逐步分析每一部分的代码。

1. 用户注册

/register 接口负责用户注册。它接收一个包含用户名和密码的 POST 请求。密码将使用 bcrypt 算法进行哈希处理,然后将用户信息存储到数据库中。

# 1️⃣ User registration
@app.route('/register', methods=['POST'])
def register():
    logger.debug("Request received at /register endpoint")
    
    # Get request data
    data = request.json
    logger.debug(f"Request Data: {data}")

    username = data.get("username")
    password = data.get("password")

    if not username or not password:
        logger.debug('Username or password is missing')
        return jsonify({"error": "Username and password cannot be empty"}), 400

    # Hash the password
    hashed_password = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
    logger.debug(f"Hashed password: {hashed_password}")

    try:
        # Insert data into the database
        cursor.execute("INSERT INTO users (username, password_hash) VALUES (%s, %s)", (username, hashed_password))
        db.commit()
        logger.debug('User registered successfully')
        return jsonify({"message": "Registration successful"}), 201
    except mysql.connector.errors.IntegrityError as e:
        logger.error(f"Error: {e}")
        return jsonify({"error": "Username already exists"}), 400
    except mysql.connector.Error as err:
        logger.error(f"Database error: {err}")
        return jsonify({"error": "Database error"}), 500

关键步骤

  • 检查用户名和密码是否为空。
  • 使用 bcrypt 对密码进行哈希。
  • 将用户信息插入到数据库中。

FAQ-1. 为什么密码需要以哈希的形式存储?
1. 不可逆性:如果数据库只存明文密码,一旦泄露,攻击者可以直接使用。使用哈希存储,攻击者即使拿到数据库,也无法直接获取密码,必须进行计算破解(如暴力破解或字典攻击)。
2. 最小化泄露影响如果某个用户的密码泄露,攻击者无法简单地用它去尝试登录其他系统(防止密码重用风险)。如果数据库存的是哈希值,即使某个用户的哈希被破解,其他用户的哈希值仍然安全。

FAQ-2. 使用哈希的形式存储密码就安全了吗?
答案是NO。哈希存储仍然面临 彩虹表攻击预计算哈希值的查表攻击)。
1. 彩虹表攻击的原理:攻击者可以预计算一组常见密码的哈希值,并存入彩虹表。如果数据库存的是没有加盐的哈希,攻击者可以直接查表找到对应的原始密码。
2. 加盐的作用:盐是一个随机生成的字符串,每个用户的密码都会带上一个独特的盐值后再进行哈希计算。这样,即使两个用户使用相同的密码,它们的哈希值也会不同。这使得彩虹表失效,因为攻击者无法预计算所有可能的盐值组合。

FAQ-3. 为什么我们使用 bcrypt 库?
bcrypt 是专门为密码存储设计的哈希算法,相比通用哈希函数(如 SHA-256),它在安全性上有以下优势:
1. 内置 Salt,防止彩虹表攻击:bcrypt 自动生成并存储 Salt,开发者无需手动处理,避免了因 Salt 处理不当导致的安全漏洞。
2. 适应性强(Work Factor,可调成本):bcrypt 提供了可配置的计算成本(Work Factor / 轮数),即使计算能力提升,仍然可以增加计算复杂度,增强抗暴力破解能力。bcrypt 的工作因子可以使计算比 SHA-256 之类的哈希慢很多,减缓暴力破解速度。
3. 设计上防御 GPU/ASIC 加速破解:现代破解手段主要利用 GPU/ASIC 进行大规模并行计算,例如暴力破解普通哈希(如 SHA-256)的速度远快于 CPU。 bcrypt 故意设计成计算密集型,并依赖 CPU 资源,使得 GPU/ASIC 失去并行计算优势,提高破解成本。
4. 其他推荐的密码哈希库:argon2 是 2015 年密码哈希竞赛的冠军,设计上比 bcrypt 更先进,安全性更高。 它优化了抗 GPU 攻击能力,并增加了内存硬性要求,让破解成本更高。 Python 可以使用 argon2-cffi 库。

2. 用户登录与 Token 生成

/login 接口允许用户使用用户名和密码登录。如果验证成功,会生成一个 JWT Token,并将其返回给客户端。

# 2️⃣ User login (returns JWT Token)
@app.route('/login', methods=['POST'])
def login():
    logger.debug('Received request at /login endpoint')
    
    # Get request data
    data = request.json
    logger.debug(f"Request Data: {data}")

    username = data.get("username")
    password = data.get("password")

    cursor.execute("SELECT id, password_hash FROM users WHERE username = %s", (username,))
    user = cursor.fetchone()

    if user and bcrypt.checkpw(password.encode(), user[1].encode()):
        # Generate JWT Token
        token_payload = {
            "user_id": user[0],
            "username": username,
            "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)  # 1 hour validity
        }
        token = jwt.encode(token_payload, app.config["SECRET_KEY"], algorithm="HS256")
        logger.debug(f"Generated JWT Token: {token}")
        return jsonify({"access_token": token})

    logger.debug('Invalid username or password')
    return jsonify({"error": "Invalid username or password"}), 401

关键步骤

  • 从数据库中获取用户信息。
  • 使用 bcrypt 验证密码是否匹配。
  • 生成 JWT Token 并返回。

3. 用户认证

/userinfo 接口用于通过 JWT Token 来验证用户身份。客户端需要在 Authorization 头中提供 Bearer Token。如果 Token 验证成功,返回用户信息。

# 3️⃣ Authenticate JWT Token (get user information)
@app.route('/userinfo', methods=['GET'])
def userinfo():
    logger.debug('Received request at /userinfo endpoint')

    token = request.headers.get("Authorization")
    if not token:
        logger.debug('No token provided')
        return jsonify({"error": "Token not provided"}), 401
    
    try:
        token = token.split("Bearer ")[-1]
        payload = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
        logger.debug(f"Token payload: {payload}")
        return jsonify({"user_id": payload["user_id"], "username": payload["username"]})
    except jwt.ExpiredSignatureError:
        logger.error('Token has expired')
        return jsonify({"error": "Token has expired"}), 401
    except jwt.InvalidTokenError:
        logger.error('Invalid token')
        return jsonify({"error": "Invalid token"}), 401

关键步骤

  • Authorization 头中提取 Token。
  • 解码并验证 Token 的有效性。
  • 返回用户信息,如果 Token 无效或过期,返回相应的错误信息。

接口测试

实现了这些接口后,我们可以使用 curlPostman 来测试这些接口。

用户注册

curl -X POST http://127.0.0.1:5001/register -H "Content-Type: application/json" -d '{"username": "testuser", "password": "password123"}'

用户登录

curl -X POST http://127.0.0.1:5001/login -H "Content-Type: application/json" -d '{"username": "testuser", "password": "password123"}'

返回字段 access_token 就是我们认证服务器生产的 JWT Token,解密此 token 可以在 payload 中看到认证的用户信息

获取用户信息(需要 JWT Token)

curl -X GET http://127.0.0.1:5001/userinfo -H "Authorization: Bearer <your_jwt_token>"

总结

在本篇博客中,我们成功地实现了一个简单的 SSO 服务器的基础功能,包括用户注册、登录以及基于 JWT 的认证。我们使用了 Flask 框架搭建了后端应用,使用 bcrypt 对密码进行了安全的存储和验证。

下一部分我们将继续扩展这个系统,加入更多的功能,比如 Token 过期处理刷新 Token 等。