你笑了

你的笑,是星星跳跃浪花的笑

0%

数据一致性问题

  • 当临界区代码包含异步操作时,对共享数据的访问跨越多个事件循环,此时会出现数据不一致情况

  • libuv主要执行IO任务,其线程池不是线程安全的,需要在JS层面维护

    多个异步操作(多次调用临界区代码)同时处于临界区内

    参考 Why do you need locking on single threaded nodejs?

  • 下面根据共享数据的来源和编程模型讨论

数据来源

数据库请求

  • 并发修改数据库中的数据时,在数据库层面,通过事务或乐观锁等,解决数据一致性

    某个异步操作先读取变量的值,再执行后续异步子操作。当后续异步子操作完成时,其他异步操作可能已经修改了变量的值,导致数据不一致。这种情况可以在数据库层面通过事务等解决

事务
  • 将获取值、处理和更新值的操作封装在一个事务中。只有当所有操作都成功完成时,事务才会提交,否则会进行回滚
  • 可以提供强一致性,但可能会影响性能
乐观锁
  • 读取数据时不加锁,而在更新数据时检查数据是否发生变化
  • 在数据表中添加一个版本字段,每次更新数据时,都将版本号加一,并在更新时检查版本号是否发生变化。如果在一个接口进行处理的过程中,数据被其他接口修改,那么版本号将不匹配,更新失败。
  • 可以提高并发性能,但在高并发环境下可能会导致大量更新失败

网络请求

某个异步操作在执行过程中通过网络请求修改了共享状态,而另一个异步操作也同时读写这个状态,导致数据不一致。这种情况需要在代码层面通过async/await的方式解决,本质是将异步操作顺序执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
let counter = 0;
function incrementCounter() {
// 读取共享数据。关键在这里,并发调用的异步操作获取的是同一个值。
// 如果不获取,而是直接操作共享数据,不会出现数据不一致问题。
const temp = counter; //闭包
// 模拟一个耗时操作,例如网络请求或定时器
setTimeout(() => {
counter = temp + 1; // 将计数器加一
console.log(`Counter: ${counter}`);
}, Math.random() * 1000);
}

// 并发调用 incrementCounter 函数多次
for (let i = 0; i < 5; i++) {
incrementCounter();
}
  • 加锁和解锁需要一定开销

  • 增加代码复杂性

  • 适合多线程或进程环境

    单线程环境使用 async/await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const { Mutex } = require('async-mutex');

let counter = 0;
const mutex = new Mutex();

async function incrementCounter() {
const release = await mutex.acquire();
try {
// 将对共享数据的读、写都加锁
const temp = counter;
// 模拟一个耗时操作,例如网络请求或定时器
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
counter = temp + 1;
console.log(`Counter: ${counter}`);
} finally {
release();
}
}

// 并发调用 incrementCounter 函数
for (let i = 0; i < 5; i++) {
incrementCounter();
}
async/await
  • 通过 async/await 将多个异步操作顺序执行

  • 代码简洁

  • 适用于单线程环境

    比加锁方式好,没有加锁和解锁的开销且代码简洁;多线程环境下,失效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let counter = 0;

async function incrementCounter() {
const temp = counter;
// 模拟一个耗时操作,例如网络请求或定时器
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
counter = temp + 1;
console.log(`Counter: ${counter}`);
}

// 同时调用 incrementCounter 函数多次
async function run() {
for (let i = 0; i < 5; i++) {
await incrementCounter(); // 多个异步操作顺序执行
}
}

run();

编程模型

单线程

  • Node.js是单线程的,使用事件驱动模型。保证了在任何给定的时间点只有一个回调在运行,没有并行的线程访问共享数据,因此通常不需要显式的锁机制来同步代码执行

  • 在Node.js中显式使用锁是一个最后的选择,因为它可能导致性能问题和增加复杂性。应当优先考虑使用异步操作和事件驱动模型来处理并发问题

    1
    2
    3
    4
    5
    6
    7
    8
    let count = 0;

    function increment() {
    count++; // 两个异步操作都修改count变量,但是最终在主线程上是顺序执行的
    }

    setTimeout(increment, 1000);
    setTimeout(increment, 1000);
    • 在两个定时器的回调中对一个共享变量进行操作。然而,我们并不需要在 increment 函数中使用任何形式的锁机制,因为 Node.js 的事件驱动模型保证了在任何给定的时间点只有一个回调在运行。即使两个回调看起来“同时”被调度,但实际上是在不同的事件循环迭代中被执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    class Counter {
    private int count = 0;
    private Object lock = new Object();

    public void increment() {
    synchronized(lock) {
    count++;
    }
    }
    }
    • 使用 synchronized 块来确保对 count 的访问是原子的,即一次只有一个线程可以访问。这是因为 Java 支持多线程,并且如果没有适当的同步,两个线程可能同时修改 count,导致数据的不一致。

多进程 / 线程

  • 使用 worker_threads 或有多个不同的Node.js进程时,对于共享数据需要使用信号量等同步机制

    参考 async-mutex 库

    例如多个Node.js 进程写日志,需要访问同一个文件,此时需要加锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    const lockfile = require('proper-lockfile');

    lockfile.lock('some-file') // 尝试获取对 'some-file' 的锁
    .then(release => { // 获取成功,返回一个Promise,通过then释放锁
    // 在这里进行文件的写操作...

    return release(); // 解锁文件
    })
    .catch(err => { // 获取失败,返回一个Promise,通过catch处理异常
    console.error(err);
    });