Skip to content

分布式锁与高并发

1. 业务场景

在微服务架构中,多个 Node.js 实例同时处理"抢购库存"请求,需要防止超卖。


2. Redis 分布式锁核心实现 🔥

错误做法

javascript
// ❌ 非原子操作,存在竞态条件
if (await redis.exists('lock')) {
    return 'locked';
}
await redis.set('lock', 'value');

正确做法

javascript
// ✅ 使用原子命令
const lockKey = 'lock:order:12345';
const lockValue = uuid.v4();  // 随机值,用于安全释放
const ttl = 30000;  // 30秒过期

const acquired = await redis.set(lockKey, lockValue, 'NX', 'PX', ttl);

if (acquired) {
    console.log('Lock acquired');
} else {
    console.log('Failed to acquire lock');
}

关键参数

参数说明
NXNot Exist - 只有 key 不存在时才设置
PX 3000030秒过期 - 防止死锁的关键
Random Value随机值 - 防止误释放他人的锁

3. 解锁的原子性 (Lua 脚本) 🔥

错误做法

javascript
// ❌ 两步操作不是原子的
const value = await redis.get(lockKey);
if (value === myValue) {
    await redis.del(lockKey);  // 可能删除别人的锁!
}

问题: GET 后锁可能刚好过期,被别人获取,然后你 DEL 了别人的锁。

正确做法

javascript
// ✅ Lua 脚本原子执行
const unlockScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else
    return 0
end
`;

const result = await redis.eval(unlockScript, 1, lockKey, lockValue);
if (result === 1) {
    console.log('Lock released');
}

4. 完整封装

javascript
class RedisLock {
    constructor(redis) {
        this.redis = redis;
        this.unlockScript = `
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
        `;
    }

    async acquire(key, ttl = 30000) {
        const value = require('crypto').randomUUID();
        const acquired = await this.redis.set(
            key, value, 'NX', 'PX', ttl
        );
        
        if (acquired) {
            return { success: true, value };
        }
        return { success: false };
    }

    async release(key, value) {
        const result = await this.redis.eval(
            this.unlockScript, 1, key, value
        );
        return result === 1;
    }

    async withLock(key, fn, ttl = 30000) {
        const lock = await this.acquire(key, ttl);
        if (!lock.success) {
            throw new Error('Failed to acquire lock');
        }
        
        try {
            return await fn();
        } finally {
            await this.release(key, lock.value);
        }
    }
}

// 使用示例
const redisLock = new RedisLock(redis);

await redisLock.withLock('order:12345', async () => {
    // 临界区代码
    await decreaseStock(productId);
});

5. Redlock 算法 (进阶) 🔥

问题

单点 Redis 存在风险:Master 挂了还没同步给 Slave,锁可能丢失。

Redlock 思想

向 N 个独立 Redis 实例依次申请锁:

┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
│ Redis 1 │  │ Redis 2 │  │ Redis 3 │  │ Redis 4 │  │ Redis 5 │
│   ✓     │  │   ✓     │  │   ✓     │  │   ✗     │  │   ✗     │
└─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘
获取成功: 3 个  >  5/2 + 1 = 3 个  ✓ 获取锁成功

成功条件

  1. > N/2 + 1 个实例上成功获取锁
  2. 总耗时 < 锁有效期
javascript
const Redlock = require('redlock');

const redlock = new Redlock(
    [redis1, redis2, redis3, redis4, redis5],
    {
        retryCount: 3,
        retryDelay: 200
    }
);

try {
    const lock = await redlock.acquire(['resource:key'], 30000);
    
    // 临界区
    await doSomething();
    
    await lock.release();
} catch (err) {
    console.log('Failed to acquire lock');
}

6. 兜底策略 🔥

IMPORTANT

即使有分布式锁,数据库层也应加上最后防线!

乐观锁 (Version)

sql
UPDATE products 
SET stock = stock - 1, version = version + 1
WHERE id = ? AND version = ?

数据库约束

sql
UPDATE products 
SET stock = stock - 1 
WHERE id = ? AND stock > 0  -- 防止负库存

完整流程

1. Redis 分布式锁 (应用层快速拦截)

2. 数据库 WHERE stock > 0 (数据层最终保障)

3. 事务提交

前端面试知识库