我有两个线程,并且float这些线程之间共享了一个 -type 值。浮点型值的写入Thread1读取同时开始。Thread2.
Thread1Thread2

 private const int BUFFER_SIZE = 65536 * 8;
 private float _readWriteValue = 0;
 private bool _stop = false;

 private void Thread1()
 {
     Random ran = new Random();
     while (!_stop)
     {
         _readWriteValue = ran. Next(1, 100);
     }
 }
 private void Thread2()
 {
     while (!_stop)
     {
         float[] BufferData  = new float[BUFFER_SIZE];
         for (int i = 0; i < BUFFER_SIZE; i++)
         {
             BufferData[i] = _readWriteValue;
         }
         ProcessMethod(BufferData);
     }
 }

 private void ProcessMethod(float[] data)
 {
     // Do Something
 }

所以,当我检查时BufferData,它只填充一个数字。例如,BufferData只填充22。似乎当_readWriteValue进入for循环时Thread2,已被锁定并且Thread1无法在其中写入新值。

我正在尝试找出解决方案。我正在尝试lockMonitor、 和ConcurrentQueue,但每次我都会得到相同的结果。BufferData仅由一个数字填充。

难道是我对多线程的理解有误?我应该怎么办?

13

  • 1
    无法复制您的问题。


    – 


  • 1
    由于它在屏幕截图中被剪掉,因此打印行是Console.WriteLine($"Distinct Values (5 or more): {string.Join(", ", data.Distinct().Take(5))}");


    – 

  • 1
    我建议使用 Interlocked 类 – 设置值Interlocked.Exchange /… 并读取值Interlocked.Read


    – 

  • 1
    即使只有 10 的缓冲区,仍然无法重现它:


    – 

  • 1
    为什么是“每个索引”?您意识到随机可以两次返回相同的值吗?您是否意识到在设置新的随机值之前填充数组可能会填充多个元素?


    – 


3 个回答
3

难道是我对多线程的理解有误?我应该怎么办?

是的,看来您不熟悉以及可能其他概念。如果您渴望编写正确的无锁代码,则必须熟悉所有这些概念。这被认为是一项极其困难的任务,比在语句保护下进行多线程更具挑战性

下面的两种解决方案都不能完全防止重复值。它们只是确保读者可以在作者做出更改时观察到更改。 如果在一次写入之间完成多次读取迭代,则会有一定次数的重复。特别是仅进行volatile读取(本答案中的解决方案 2)时,您将有相当多的重复,因为读取器只是执行纯加载,这些加载将命中 L1d 缓存,直到写入器使其无效。但如果数组足够大,您仍然会看到不同的随机值,并且运行长度将大致对应于该运行中的速度比。

解决方案1:简单锁定

是与受保护的代码部分中类的可变状态(_readWriteValue_stop字段)进行交互,其中一次只能有一个线程进入。这个概念也称为同步互斥,名称的由来

private const int BUFFER_SIZE = 65536 * 8;

private readonly object _locker = new();
private float _readWriteValue = 0;
private bool _stop = false;

private void Thread1()
{
    Random random = new Random();
    while (true)
    {
        float localValue = random.Next(1, 100);
        lock (_locker)
        {
            _readWriteValue = localValue;
            if (_stop) break;
        }
    }
}

private void Thread2()
{
    while (true)
    {
        lock (_locker)
        {
            if (_stop) break;
        }
        float[] bufferData = new float[BUFFER_SIZE];
        for (int i = 0; i < BUFFER_SIZE; i++)
        {
            float localValue;
            lock (_locker)
            {
                localValue = _readWriteValue;
            }
            bufferData[i] = localValue;
        }
        ProcessMethod(bufferData);
    }
}

public void Stop()
{
    lock (_locker) _stop = true;
}

请注意,我lock仅使用 来保护与共享状态的交互,而不保护其他任何内容。调用ProcessMethod不受保护,因为它不必受保护。此方法仅与本地bufferData参数交互,不与其他线程共享。将其包含在 中lock会无缘无故地增加锁争用。

_locker在所有语句中使用相同的对象非常重要lock。小心和遵守纪律也很重要。对类的共享状态进行一次不受保护的访问就足以毁掉一切并使程序的行为变得不确定。

解决方案 2:使用 volatile 进行无锁

解决数字永远重复问题的无锁解决方案是将字段声明为

private volatile float _readWriteValue = 0;
private volatile bool _stop = false;

bufferData这将防止在( )中无限重复相同的数字,因为volatile关键字通知编译器该字段可能会被同时执行的多个线程修改,因此它不会进行旨在进行的优化单线程程序。

在您的情况下,编译器很可能通过将字段的值存储_readWriteValue在局部变量(在线程的堆栈中)中来优化您的代码,并永远重用该值,而不是在堆中的对象中查找实际值。如果您不另外说明,C# 编译器会假定您的程序是单线程的,并且可能会进行优化以使您的程序更快、更高效。编译器不会(也不应该)关心这样做是否会对未正确编写的多线程代码造成严重破坏。

换句话说,所有对单线程代码安全的优化都允许用于普通变量,这就是为什么在不锁定的情况下共享它们或者volatile某些同时访问是写入的情况下共享它们是一个坏主意。因此,在每个线程中,不与其他线程通信的代码可以得到充分优化。

它可能会重复几次,甚至数百次,具体取决于速度有多慢ran.Next;与负载相比,FP RNG 相当慢。一旦一次加载成功,后续加载将命中 L1d 缓存,直到另一个线程作为其读取所有权 (RFO) 的一部分使我们的副本无效,以获得缓存行 ( ) 的独占所有权,以便写入它。这样读者就会有突飞猛进的进步。无序执行和多个加载缓冲区可能会使其更加突发,因为许多加载可以排队等待我们下次获取缓存行的副本。然后,当该行的副本到达以满足我们的共享请求时,所有这些 L2 缓存未命中将立即完成。

通过锁定,突发会短得多(以数组元素的数量计),因为读取器的每次锁定/解锁比简单的加载和存储到数组要慢得多。

12

  • volatile没有锁如何防止重复数字?Random.Next比赋值慢很多,甚至可能比整个循环慢。 fiddle 起作用的唯一原因是Thread.Sleep驱逐线程并需要远远超过 10 毫秒的时间来重新调度。删除它,大多数迭代都会得到重复值。有些甚至得到单个重复值


    – 


  • @PanagiotisKanavos 可以volatile防止重复始终相同的数字,这是OP观察到并想要解释的内容。通知volatile编译器该字段被多个线程使用,因此编译器发出的指令始终直接读取该字段,而不是将其值缓存在局部变量中,并永远重用该值。当然,缓冲区可能偶尔会被相同的数字填充,例如,如果在运行Thread1时操作系统将其挂起几毫秒。Thread2


    – 


  • 所以你仍然需要锁。这不是无锁选项。这不是挂起问题,Thread1 中的代码比 Thread2 中的循环慢,因此总是存在竞争条件


    – 


  • 1
    使用volatile,而不是获得所有相同的数字,使用足够大的数组,您将获得相同数字的游程,其中游程长度对应于速度比。你的答案应该告诉OP去寻找它(它可能在工具提示弹出窗口中不可见,就像它们在屏幕截图中显示的那样;这只有大约十几个值)。


    – 

  • 1
    你的回答听起来就像volatile独自一人将完全避免重复的值。它不会,但它是无锁读取器的必要组成部分,它重复观察共享变量(如_stopor )_readWriteValue。记录读取循环看到的值的演示很有趣,有助于理解不同步的线程所发生的情况,并且这种情况对于无锁代码来说是正常的。


    – 

简而言之,您需要一个像 AutoResetEvents 这样的机制,或者像 BlockingQueue 或 Channel 这样的线程之间的线程安全队列。锁和易失性还不够。

_readWriteValue未锁定。循环的Thread2运行速度比调用快得多,ran.Next(1, 100)因此整个循环可以在ran.Next(1, 100)有机会第二次运行之前完成。即使您添加volatile以阻止某些优化,_readWriteValue在循环运行时仍然具有原始值。

这称为。执行结果完全取决于每个线程的运行速度。要解决此问题,您需要一种同步写入器 Thread1 和读取器 Thread2 的方法。

这是一个更加棘手的问题,因为读者必须读取作者产生的所有值。这需要读取器和写入器之间的双向同步。

将读/写与事件同步

为了避免这个问题,需要一种方法来确保读取器(Thread2)只能在写入器(Thread1)写入值之后读取该值,并且写入器在读取器读取前一个值之前无法写入该值。

执行此操作的一种方法是使用类。线程可以等待 AutoResetEvent 类,直到另一个线程向它发出信号。当发生这种情况时,等待线程继续并且事件自动重置。

一个事件用于指示 Thread1 生成了一个值,另一个事件用于指示 Thread2 消耗了该值。此片段不重复数字:

static readonly AutoResetEvent _thread1Step = new AutoResetEvent(false);
static readonly AutoResetEvent _thread2Step = new AutoResetEvent(true);

private static void Thread1()
{
    Random ran = new Random(100);
    while (!_stop)
    {
        
        _thread2Step.WaitOne();
        _readWriteValue = ran.Next(1, 100);
        _thread1Step.Set();
    }
}

private static void Thread2()
{
    while (!_stop)
    {
        
        const int BUFFER_SIZE = 100;
        float[] bufferData = new float[BUFFER_SIZE];
        for (int i = 0; i < BUFFER_SIZE; i++)
        {
            _thread1Step.WaitOne();
            bufferData[i] = _readWriteValue;
            _thread2Step.Set();
        }
        // Print statistics
        Console.WriteLine(String.Join(", ", bufferData));
        Thread.Sleep(10);
    }
}

这打印

96, 16, 67, 90, 36, 94, 71, 61, 35, 15, 96, 54, 38, 50, 97, 48, 89, 99, 39, 4, 11, 38, 89, 96, 50, 84, 91, 65, 17, 62, 11, 79, 51, 50, 56, 10, 86, 53, 47, 74, 28, 50, 34, 59, 63, 86, 36, 3, 91, 44, 3, 79, 30, 86, 22, 59, 27, 70, 40, 51, 4, 7, 44, 73, 5, 17, 3, 88, 93, 88, 62, 37, 53, 64, 76, 60, 5, 30, 34, 64, 49, 88, 73, 73, 59, 32, 50, 65, 28, 97, 83, 17, 13, 95, 70, 21, 6, 60, 55, 46
83, 48, 9, 4, 82, 66, 26, 65, 10, 61, 54, 96, 36, 76, 3, 15, 33, 71, 15, 73, 67, 38, 60, 96, 6, 45, 24, 57, 94, 55, 55, 45, 75, 87, 81, 1, 79, 70, 90, 92, 6, 85, 55, 66, 44, 86, 80, 37, 93, 55, 88, 34, 46, 81, 78, 45, 87, 13, 98, 37, 42, 69, 70, 55, 7, 10, 21, 49, 94, 2, 36, 63, 80, 22, 67, 82, 83, 93, 53, 19, 65, 87, 63, 40, 67, 21, 98, 93, 9, 36, 14, 67, 72, 54, 51, 36, 15, 99, 60, 34
66, 32, 42, 91, 20, 25, 54, 25, 15, 96, 62, 93, 60, 79, 72, 55, 6, 31, 88, 85, 11, 27, 40, 58, 88, 68, 91, 27, 71, 31, 67, 83, 34, 19, 53, 33, 45, 72, 21, 42, 67, 74, 79, 13, 74, 20, 7, 92, 81, 95, 30, 84, 12, 74, 24, 39, 92, 83, 4, 51, 33, 28, 53, 83, 29, 78, 60, 42, 27, 39, 11, 34, 10, 46, 19, 36, 48, 27, 84, 68, 62, 99, 45, 75, 1, 83, 72, 59, 94, 14, 41, 61, 68, 69, 9, 39, 21, 96, 83, 96

使用 BlockingQueue 有效

像这样将一个线程耦合到另一个线程并不总是一个好主意,因为每个线程都必须等待另一个线程继续。如果一个人因为一个原因放慢速度,另一个原因也会放慢速度。避免这种情况的一种方法是使用线程安全队列

像另一个答案一样使用 BlockingQueue 只需稍作修改即可工作:

private static BlockingCollection<float> _queue=new BlockingCollection<float>(100);

private static void Thread1()
{
    Random ran = new Random();
    while (!_stop)
    {
        _queue.Add( ran.Next(1, 100));
    }
}

private static void Thread2()
{
    while (!_stop)
    {
        const int BUFFER_SIZE = 100;
        float[] bufferData = new float[BUFFER_SIZE];
        for(int i=0;i<BUFFER_SIZE;i++)
        {
            bufferData[i] = _queue.Take();
        }
        // Print statistics
        Console.WriteLine(String.Join(", ", bufferData));
        Thread.Sleep(10);
    }
}

和印刷品

18, 70, 21, 74, 38, 77, 75, 15, 16, 62, 97, 22, 57, 98, 37, 63, 26, 80, 47, 21, 50, 94, 7, 84, 84, 78, 11, 92, 5, 82, 37, 4, 25, 1, 21, 62, 46, 97, 83, 87, 72, 33, 99, 50, 59, 69, 33, 88, 65, 86, 55, 56, 77, 44, 6, 31, 31, 19, 13, 60, 38, 12, 67, 12, 88, 34, 18, 8, 94, 86, 26, 7, 20, 87, 32, 37, 93, 46, 82, 58, 44, 57, 89, 10, 26, 38, 51, 54, 42, 56, 16, 87, 66, 66, 79, 55, 50, 87, 18, 90
94, 37, 77, 91, 62, 12, 48, 96, 91, 12, 84, 45, 80, 5, 32, 59, 4, 72, 47, 83, 88, 74, 63, 39, 62, 28, 32, 86, 88, 38, 85, 62, 18, 84, 82, 36, 42, 81, 20, 66, 36, 30, 92, 31, 62, 54, 68, 25, 84, 67, 31, 26, 16, 15, 32, 67, 22, 33, 22, 68, 68, 55, 6, 59, 81, 18, 6, 46, 10, 33, 73, 78, 65, 37, 84, 79, 34, 34, 51, 21, 6, 51, 80, 25, 53, 30, 50, 39, 53, 5, 1, 44, 36, 70, 57, 39, 67, 24, 37, 47
43, 61, 86, 89, 87, 83, 29, 23, 24, 78, 28, 10, 39, 90, 22, 66, 23, 70, 51, 48, 83, 3, 23, 92, 29, 22, 30, 98, 96, 16, 96, 99, 71, 85, 42, 96, 47, 57, 4, 7, 98, 8, 28, 91, 38, 6, 27, 69, 93, 65, 42, 70, 22, 53, 67, 57, 36, 45, 81, 89, 63, 42, 52, 63, 59, 47, 33, 1, 66, 49, 9, 3, 46, 60, 6, 23, 76, 83, 48, 99, 61, 10, 4, 42, 22, 57, 15, 5, 9, 51, 89, 37, 40, 80, 61, 46, 56, 71, 24, 

这是因为编写器 Thread1 将所有值写入线程安全队列。读者Thread2将这些内容一一阅读。使用容量为 100 的队列意味着如果写入器比读取器更快,则值不会丢失。如果集合已满,则写入者将被阻止。

如果将容量设置为 1,则可以有效地同步运行两个线程。

使用 CancellationToken

_stop本身需要锁定。不过,最好使用专门构建的类,而不是添加锁。这是一个结构体,可以传递给支持取消的方法,以告诉它们何时取消。几乎所有支持取消的 .NET 方法都使用 CancellationToken,其中包括BlockingCollection.Take

private static CancellationTokenSource _cts=new ();

private static BlockingCollection<float> _queue=new (100);

public static void Stop()
{
    _cts.Cancel();
}
private static void Thread1()
{
    Random ran = new Random();
    var token=_cts.CancellationToken;

    while (!token.IsCancellationRequested)
    {
        _queue.Add( ran.Next(1, 100));
    }
}

private static void Thread2()
{
    var token=_cts.CancellationToken;

    while (!_token.IsCancellationRequested)
    {
        const int BUFFER_SIZE = 100;
        float[] bufferData = new float[BUFFER_SIZE];
        for(int i=0;i<BUFFER_SIZE;i++)
        {
            bufferData[i] = _queue.Take(token);
        }
        // Print statistics
        Console.WriteLine(String.Join(", ", bufferData));
        Thread.Sleep(10);
    }
}

现代方式:async/await

通过使用任务、异步/等待和通道(相当于 BlockingCollection 的异步方式)可以大大简化代码。这次不需要任何字段。通道和令牌可以作为参数传递给 Writer 和 Reader 函数:

public static async Task Main()
{
    var  cts=new CancellationTokenSource(TimeSpan.FromSeconds(10));
    var channel=Channel.CreateBounded<float>(100);

    var task1=Task.Run(()=>Writer(channel.Writer,cts.Token));
    var task2=Task.Run(()=>Reader(channel.Reader,cts.Token));
    await Task.WhenAll(task1,task2);
}
        

private static async Task Writer(ChannelWriter<float> writer,CancellationToken token)
{
    Random ran = new Random();
    while (!token.IsCancellationRequested)
    {
        var value = ran.Next(1, 100);
        await writer.WriteAsync(value,token);
    }
}

private static async Task Reader(ChannelReader<float> reader,CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {           
        const int BUFFER_SIZE = 100;
        float[] bufferData = new float[BUFFER_SIZE];
        for (int i = 0; i < BUFFER_SIZE; i++)
        {               
            bufferData[i] = await reader.ReadAsync(token);
        }
        // Print statistics
        Console.WriteLine(String.Join(", ", bufferData));
        await Task.Delay(10);
    }
}

输出是:

54, 38, 99, 32, 82, 40, 50, 71, 39, 84, 69, 75, 69, 91, 70, 46, 20, 82, 72, 34, 27, 26, 79, 25, 48, 28, 84, 98, 4, 39, 99, 30, 11, 28, 47, 63, 89, 27, 35, 14, 92, 84, 33, 36, 69, 55, 65, 80, 59, 24, 63, 83, 6, 79, 18, 23, 5, 72, 47, 22, 36, 89, 51, 9, 57, 16, 50, 84, 35, 97, 35, 24, 36, 38, 99, 5, 31, 16, 14, 9, 62, 51, 67, 25, 29, 61, 20, 22, 31, 58, 93, 98, 97, 5, 89, 93, 56, 58, 74, 41
7, 59, 61, 7, 19, 33, 37, 67, 57, 25, 99, 13, 80, 56, 46, 21, 2, 41, 97, 84, 11, 40, 11, 58, 12, 94, 1, 23, 79, 20, 15, 26, 40, 51, 51, 55, 42, 11, 34, 39, 5, 99, 1, 59, 5, 75, 8, 74, 93, 23, 66, 63, 41, 16, 71, 36, 88, 74, 61, 38, 63, 46, 50, 10, 26, 64, 25, 28, 28, 55, 82, 5, 80, 68, 52, 79, 37, 56, 80, 91, 67, 87, 3, 17, 87, 84, 87, 94, 55, 13, 64, 92, 60, 99, 64, 53, 72, 87, 14, 99
64, 33, 93, 29, 14, 60, 48, 40, 46, 71, 5, 96, 12, 22, 50, 49, 36, 78, 27, 95, 24, 40, 8, 45, 29, 27, 6, 33, 99, 62, 84, 85, 29, 72, 5, 79, 73, 17, 88, 92, 38, 16, 39, 49, 66, 70, 37, 99, 64, 11, 31, 61, 3, 46, 21, 3, 71, 31, 68, 87, 14, 19, 24, 80, 93, 14, 97, 90, 13, 32, 90, 64, 97, 92, 37, 49, 66, 2, 32, 93, 57, 72, 97, 4, 75, 62, 80, 17, 73, 98, 32, 43, 6, 64, 38, 92, 73, 21, 55, 65

挥发性不起作用

即使禁用编译器优化,Thread1 生成值的速度也比 Thread2 读取它们的速度慢很多。

编译器和 CPU 优化可以将循环重写为复制值的代码_readWriteValue,并将其直接复制到每个位置,甚至可能使用 SIMD 操作一次分配多个位置。关键字可用于告诉编译器避免此类优化。它不引入任何类型的锁定。

该代码几乎原封不动地取自其他答案之一,重复打印相同的数字,就像您自己的代码一样:

using System;
using System.Threading;
using System.Linq;

public class Program
{
    private static volatile float _readWriteValue = 0;
    private static volatile bool _stop = false;

    public static void Main()
    {
        Timer timer = new(_ => _stop = true);
        timer.Change(200, Timeout.Infinite);

        Thread t1 = new(() => Thread1());
        Thread t2 = new(() => Thread2());

        t1.Start(); t2.Start();
        t1.Join(); t2.Join();
    }

    private static void Thread1()
    {
        Random ran = new Random();
        while (!_stop)
        {
            _readWriteValue = ran.Next(1, 100);
        }
    }

    private static void Thread2()
    {
        while (!_stop)
        {
            const int BUFFER_SIZE = 100;
            float[] bufferData = new float[BUFFER_SIZE];
            for (int i = 0; i < BUFFER_SIZE; i++)
            {
                bufferData[i] = _readWriteValue;
            }
            // Print statistics
            Console.WriteLine(String.Join(", ", bufferData));
            Thread.Sleep(10);
        }
    }
}

结果:

73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 66, 66, 66, 66
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 40, 40, 40, 40
24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 13, 13, 13, 13, 13
50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 22, 22, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 54, 54
59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 61, 61, 61, 61, 61, 61, 61, 61, 61, 61, 61, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 41, 41, 41, 41, 41, 41, 41, 41, 41, 41, 41, 41, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 74, 74

每次写入大约有 10-15 次读取操作。

仅有锁是不够的

原因是相同的 – 线程以不同的速度运行,读取器读取值的速度总是比写入器生成值的速度快。

代码的这种变体也重复值:

private static object _locker=new();

private static void Thread1()
{
    Random ran = new Random(100);
    while (!_stop)
    {
        lock(_locker)
        {
            _readWriteValue = ran.Next(1, 100);
        }
    }
}

private static void Thread2()
{
    while (!_stop)
    {
        const int BUFFER_SIZE = 100;
        float[] bufferData = new float[BUFFER_SIZE];
        for (int i = 0; i < BUFFER_SIZE; i++)
        {
            lock(_locker)
            {
                bufferData[i] = _readWriteValue;
            }
        }
        // Print statistics
        Console.WriteLine(String.Join(", ", bufferData));
        Thread.Sleep(10);
    }

这打印

55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 69, 69, 69, 69, 69, 69, 22, 22, 22, 22, 22, 22, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 54, 54, 54, 58, 58, 58, 58, 83, 83, 83, 53, 53, 20, 20, 20, 20, 20, 20, 33, 33, 33, 33, 33, 33, 33, 33, 33, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39
29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 40, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71
43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 8, 8, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59

该示例不是线程安全的。每当您读取和写入任何共享值时,您都需要某种形式的同步。编译器基本上可以将您的示例重写为:

     private void Thread2()
     {
         var value = _readWriteValue;
         while (!_stop)
         {
             float[] BufferData  = new float[BUFFER_SIZE];
             for (int i = 0; i < BUFFER_SIZE; i++)
             {
                 BufferData[i] = value ;
             }
             ProcessMethod(BufferData);
         }
     }

仅仅添加volatile_readWriteValue可能有助于线程安全方面,但是何时以及是否 volatile 合适还存在争议。 Eric Lippert 在《

坦率地说,我不鼓励你创建一个不稳定的领域

另一种选择是插入,但除非您真的知道自己在做什么,否则我建议使用锁。

还有一个重要问题是循环运行所需的时间可能相差很大。如果您想生成一次读取的单个值,您应该使用队列进行通信。有很多方法可以做到这一点,但一个简单的方法是使用

private BlockingCollection<float> queue = new BlockingCollection(boundedCapacity:100);
private void Thread1()
{
     Random ran = new Random();
     while (!_stop)
     {
         queue.Add( ran. Next(1, 100))
     }
    queue.CompleteAdding();
}
private void Thread2()
{
     int i = 0;
     float[] BufferData  = new float[BUFFER_SIZE];
     foreach(var value in queue){
          i = (i + 1) % 10;
          BufferData[i] = value;
          if(i == 0){
              ProcessMethod(BufferData);
          }
    }
}

这使用了“有限容量”,因此如果没有更多空间,线程 1 将阻塞,如果没有可用值,线程 2 将阻塞。

8

  • 1
    ConcurrentQueue 或 Channel<float> 也是不错的选择。


    – 

  • 2
    我不同意阻止人们使用关键字,因为它是他们想要做的事情的最佳工具。


    – 

  • 1
    @TheodorZoulias,这不是向运行速度比生产者快得多的消费者发布数据的正确工具。Random.Next比本地数组赋值慢几个数量级


    – 


  • 1
    并且volatile不会阻止重复值


    – 


  • 1
    @PanagiotisKanavos:建议避免无锁编程可能更有意义(除非你真的确定它是适合这项工作的工具,并且你想付出努力来处理所需的复杂性)修正它)。volatile对于那些尝试使用无锁代码的人来说,告诉人们避免是有帮助的,就像OP一样。它本身并不能解决这个问题,但它是重复观察共享变量的循环的必要部分。


    –