J05 同步

一、进程和线程基础


  1. 进程

    • 由操作系统创建,用于运行程序,可使多个进程以“假实时”并行方式运行。
    • 选择器负责将处理器资源分配给进程。
    • 通过共享页面完成进程之间通信。
  2. 进程的状态:

    • 状态(États)
      • Actif(活动状态):表示进程正在CPU上运行。
      • Prêt(就绪状态):进程已准备好运行,但正在等待CPU的分配。
      • Bloqué(阻塞状态):进程由于等待某些事件(例如I/O操作或资源可用性)而无法运行。
    • 状态转换事件
      • Création(创建):一个新的进程被创建,通常是由操作系统或一个已有的进程创建的。
      • Suppression(删除):进程被终止并从系统中移除。
      • Fin du quantum(时间片结束):进程的时间片用完,操作系统将CPU分配给其他进程。
      • Demande d’entrée/sortie ou de ressource(请求输入/输出或资源):进程请求某些资源或I/O操作,进入阻塞状态。
      • Fin d’entrée/sortie ou ressource disponible(输入/输出结束或资源可用):进程等待的I/O操作完成或资源变得可用,进程从阻塞状态转移到就绪状态。
      • Élection(调度):操作系统选择某一就绪进程,将其分配到CPU上执行。
      • Terminaison(终止):进程完成执行并被系统终止。
  3. 线程

    • 一个进程可以同时执行多个任务(多线程),实现(伪)并行性(pseudo-parallélisme)。
    • 线程被称为“轻量级进程”(processus légers),它们是进程内部的执行子单元。
    • 线程的执行由调度器(ordonnanceur)控制,调度器负责在系统中分配CPU资源给线程或进程。
    • 在多线程环境中,调度器需要确保各个线程公平地获得执行机会,并避免死锁(deadlock)等问题。
    • 线程使用其所属进程的内存空间和资源,并能够与其他线程进行通信。
  4. Java 中的 Thread

    Java 语言本身原生支持线程(Thread)的管理功能。

    • 线程的属性
      • 每个 Java 线程都有一个名称(Nom),用于标识线程。
      • 线程具有一个整数类型的优先级(Priorité)int),用于调度时的参考。
      • 线程有一个状态(État),由 Thread.State 枚举表示,定义线程的生命周期状态。
    • 线程的组织
      • 线程可以通过 ThreadGroup 按照层次结构(hiérarchiquement)进行组织和管理。
  5. 基本类型

    • Runnable实现流程内容实现展示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class MyRunnable {
    public void run() {
    // 实现核心输入输出逻辑
    }
    }

    class TestRunnable {
    public static void main(String[] args) {
    MyRunnable r = new MyRunnable();
    Thread t = new Thread(r);
    t.start();
    }
    }

二、平台线程与线程池


  1. 平台线程

    • 线程由操作系统管理。
    • 系统调度程序负责线程与进程的管理。
    • 线程创建需要系统调用,因而代价较高。
  2. 线程池

    • 创建线程池:使用 Executors.newFixedThreadPool(int) 方法可以创建一个具有固定数量线程的线程池。
    • 提交任务:使用 ExecutorService.submit(Runnable) 方法将任务提交到线程池中执行。
    • 获取执行结果:使用 Future<?>.get() 方法可以获取任务执行的结果或等待任务执行完成。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;

    public class ThreadPoolExample {
    public static void main(String[] args) {
    // 创建一个固定大小为 3 的线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    // 提交三个任务到线程池
    Runnable task1 = () -> {
    System.out.println(Thread.currentThread().getName() + " 正在执行任务 1");
    try {
    Thread.sleep(1000); // 模拟耗时操作
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.err.println("任务 1 被中断");
    }
    };

    Runnable task2 = () -> {
    System.out.println(Thread.currentThread().getName() + " 正在执行任务 2");
    try {
    Thread.sleep(2000); // 模拟耗时操作
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.err.println("任务 2 被中断");
    }
    };

    Runnable task3 = () -> {
    System.out.println(Thread.currentThread().getName() + " 正在执行任务 3");
    try {
    Thread.sleep(1500); // 模拟耗时操作
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.err.println("任务 3 被中断");
    }
    };

    // 提交任务
    threadPool.submit(task1);
    threadPool.submit(task2);
    threadPool.submit(task3);

    // 关闭线程池,禁止再提交新的任务
    threadPool.shutdown();

    // 检查线程池是否已经终止
    while (!threadPool.isTerminated()) {
    // 等待线程池中的任务执行完成
    }

    System.out.println("所有任务已完成");
    }
    }

  3. 虚拟线程

    • Java 21 引入虚拟线程,支持创建大量轻量级线程,由 JVM 管理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class VirtualThreadExample {
    public static void main(String[] args) {
    // 创建一个虚拟线程并启动
    Thread virtualThread = Thread.startVirtualThread(() -> {
    System.out.println(Thread.currentThread().getName() + " 正在执行任务");
    try {
    Thread.sleep(1000); // 模拟耗时操作
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.err.println("任务被中断");
    }
    System.out.println(Thread.currentThread().getName() + " 任务完成");
    });

    // 主线程等待虚拟线程完成(可选)
    try {
    virtualThread.join();
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.err.println("主线程被中断");
    }

    System.out.println("主线程结束");
    }
    }

三、线程安全问题与解决方案


  1. 线程安全问题:

    • 共享变量可能导致数据竞争问题。
    • 示例代码:
      • creditdebit 方法被调用时,首先会将内存中的 solde 属性加载到处理器的寄存器(register)中。
      • 对于 credit 方法:将传入的金额(montant)与寄存器中的 solde 值相加。
      • 对于 debit 方法:将传入的金额从寄存器中的 solde 值中减去。
      • 在寄存器中完成加/减操作后,会将新的 solde 值从寄存器写回到内存(即类的 solde 属性中)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    class Compte {
    private double solde;

    public void credit(double montant) {
    solde += montant;
    }

    public void debit(double montant) {
    solde -= montant;
    }
    }

    • 上述代码在多线程环境中可能导致错误计算。
  2. 正确的线程安全实现:

    • 使用 synchronized 关键字:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    class Compte {
    private double solde;

    public synchronized void credit(double montant) {
    solde += montant;
    }

    public synchronized void debit(double montant) {
    solde -= montant;
    }
    }
    class Compte {
    private double solde;
    }
    public void credit( double montant ) {
    synchronized( this ) {
    solde += montant;
    }
    }
    public void debit( double montant ) {
    synchronized( this ) {
    solde -= montant;
    }
    }

    • 使用锁机制或 java.util.concurrent 工具包:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import java.util.concurrent.locks.ReentrantLock;

    class Compte {
    private double solde;
    private final ReentrantLock lock = new ReentrantLock();

    public void credit(double montant) {
    lock.lock();
    try {
    solde += montant;
    } finally {
    lock.unlock();
    }
    }

    public void debit(double montant) {
    lock.lock();
    try {
    solde -= montant;
    } finally {
    lock.unlock();
    }
    }
    }

四、线程执行示例


  1. 正确执行示例:

    • 当线程正确地加载、修改并写入共享资源(如账户余额)时,能够避免竞态条件。
    • 过程描述:
      1. 线程 DAB 加载账户余额(LDR solde),执行减法操作(SUB 20),并将结果写回内存(STR solde)。
      2. 线程 Employeur 加载更新后的余额(LDR solde),执行加法操作(ADD 50),并写回内存(STR solde)。
  2. 错误执行示例:

    • 如果多个线程在未同步的情况下访问共享资源,可能会发生竞态条件,导致数据不一致。
    • 错误过程描述:
      1. 线程 DAB 和线程 Employeur 同时加载初始余额(均为 100)。
      2. 线程 DAB 减去 20 后写回余额为 80,但线程 Employeur 的计算基于未更新的初始余额(100)。
      3. 线程 Employeur 加 50 后写回余额为 150,最终结果错误。
  3. 解决方法:

    • 使用 synchronized 或锁机制确保对共享资源的访问是互斥的,避免竞态条件。

    示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    class SafeCompte {
    private double solde;
    private final Object lock = new Object();

    public void credit(double montant) {
    synchronized (lock) {
    solde += montant;
    }
    }

    public void debit(double montant) {
    synchronized (lock) {
    solde -= montant;
    }
    }
    }

五、关键术语与解决方案


  1. 关键资源与互斥
    • 关键资源(Ressource critique):不能被多个进程同时访问的资源。
    • 关键区段(Section critique):访问关键资源的代码部分。
    • 互斥(Exclusion mutuelle):用于限制同时访问关键资源的机制。
  2. 关键属性
    • 安全性(Sûreté):任意时刻最多只有一个进程使用关键资源。
    • 活性(Vivacité)
      • 进程能立即访问空闲资源。
      • 进程在有限时间内能够访问资源。
    • 效率(Efficacité):等待资源的进程不消耗处理器。
  3. 常见算法与系统支持
    • Peterson 和 Dekker 算法提供了解决方案,但可能会导致非效率的主动等待。
    • 操作系统负责管理线程和进程的阻塞状态,从而支持资源访问控制。
  4. 监视器(Moniteur)
    • 监视器是一个支持互斥访问的编程模块。
    • 关键特点:
      • 外部无法访问的变量。
      • 使用互斥机制的函数(同一时间仅一个线程执行)。
    • Java 中,synchronized 关键字用于实现监视器。
    • 示例代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class Compte {
    private double solde;

    public synchronized void credit(double montant) {
    solde += montant;
    **notifyAll(); // 通知其他等待线程**
    }

    public synchronized void debit(double montant) {
    while (solde < montant) {
    **wait(); // 等待条件满足**
    }
    solde -= montant;
    }
    }

六、生产者-消费者模式


  1. 模式描述:

    • 生产者将数据放入共享缓冲区,消费者从缓冲区取出数据。
    • 通过 notify()wait() 实现同步。
  2. 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    class Data {
    private int data;
    private boolean dataPresent = false;

    public synchronized void put(int data) {
    while (dataPresent) {
    wait(); // 等待消费者消费
    }
    this.data = data;
    dataPresent = true;
    notify(); // 通知消费者
    }

    public synchronized int get() {
    while (!dataPresent) {
    wait(); // 等待生产者生产
    }
    dataPresent = false;
    notify(); // 通知生产者
    return data;
    }
    }

  3. 信箱模型(Boîtes à lettres):

    • 多个生产者和消费者共享信箱。
    • 每个信箱存储一条消息。

  4. 半同步模式(Demi Rendez-vous):

    • 使用布尔值变量标记任务状态。
    • 示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    class DemiRendezVous {
    private boolean present = false;

    public synchronized void t1() {
    present = true;
    notify();
    }

    public synchronized void t2() {
    while (!present) {
    wait();
    }
    }
    }

  5. 同步模式(Rendez-vous )

    • 同步的 Rendez-vous 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    class RendezVous {
    private boolean t1Present = false;
    private boolean t2Present = false;

    public synchronized void t1() {
    t1Present = true;
    if (!t2Present) {
    wait();
    } else {
    notify();
    }
    }

    public synchronized void t2() {
    t2Present = true;
    if (!t1Present) {
    wait();
    } else {
    notify();
    }
    }
    }

  6. 死锁

    • 错误实现可能导致死锁:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    class MauvaisRendezVous {
    private boolean t1Present = false;
    private boolean t2Present = false;

    public synchronized void t1() {
    t1Present = true;
    if (!t2Present) {
    wait();
    } else {
    notify();
    }
    }

    public synchronized void t2() {
    if (!t1Present) {
    wait();
    } else {
    notify();
    }
    t2Present = true;
    }
    }

七、哲学家就餐问题


  • 多线程经典同步问题。
  • 哲学家需要同时获得左右两侧的资源(筷子)。
  • 常见解决方法:
    • 使用信号量(Semaphore)。
    • 限制资源的同时请求。

八、容器同步与 AtomicInteger


  1. 容器同步问题
    • 标准容器(如 ArrayList)在多线程环境下不安全。
    • java.util.concurrent 提供线程安全容器。
  2. 使用 AtomicInteger
    • 示例代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import java.util.concurrent.atomic.AtomicInteger;

    class Compte {
    private AtomicInteger solde = new AtomicInteger();

    public void credit(double montant) {
    solde.getAndAdd((int) (montant * 100));
    }

    public void debit(double montant) {
    solde.getAndAdd((int) (-montant * 100));
    }

    public double getSolde() {
    return solde.get() / 100.0;
    }
    }

九、信号量(Semaphore)


  1. 基本概念
    • 由 Dijkstra 于 1965 年提出。
    • 包含:
      • 一个计数器。
      • 一个线程等待队列。
      • 三种原子操作:initwaitsignal
  2. 操作描述
    • init(val):初始化计数器为 val,队列为空。
    • wait():计数器减 1,若计数器为负数,则阻塞线程并放入队列。
    • signal():计数器加 1,若计数器为负或 0,则释放一个队列中的线程。
  3. Java 实现
    • java.util.concurrent.Semaphore 提供了信号量的实现。
    • 示例代码:
    1
    2
    3
    4
    5
    6
    7
    import java.util.concurrent.Semaphore;

    Semaphore semaphore = new Semaphore(1);

    semaphore.acquire(); // 获取许可
    // 临界区代码
    semaphore.release(); // 释放许可

十、自动化并行化


  1. Java 8 数据流
    • Collection.stream() 提供顺序数据流处理。
    • Collection.parallelStream() 提供并行数据流处理。
  2. 自动创建线程
    • JVM 自动为并行数据流创建线程。