如何强制线程轮流连接到字符串?

How do I force threads to take turn concatenating to a string?

本文关键字:字符串 连接 流连 何强制 线程      更新时间:2023-10-16

>基本上我有 2 个文本文件,每个文件都有一堆 1 个字符长的行。一个文件中的每个字符都是一个字母或零,如果字符为零,我需要查看另一个文件以查看应该存在的内容。我的目标是启动两个线程,每个线程读取一个单独的文件并将每个字符添加到字符串中。

文件 1:

t
0
i
s
0
0
0
t
e
0
t

文件 2:

0
h
0
0
i
s
a
0
0
s
0

因此,此的预期输出应该是"thisisatest"。

我目前能够运行这两个线程并让每个线程读取各自的文件,我知道我需要使用互斥锁(( 和 unlock(( 来确保一次只有一个线程添加到字符串中,但我在弄清楚如何实现它时遇到了麻烦。

mutex m;
int i = 0;
string s = "";
void *readFile(string fileName) {
ifstream file;
char a;
file.open(fileName);
if(!file) {
cout << "Failed to open file." << endl;
exit(1);
}
while(file >> a) {
if(a == '0') {
} else {
s += a;
}
}
}
int main() {
thread p1(readFile, "Person1");
thread p2(readFile, "Person2");
p1.join();
p2.join();
cout << s << endl;
return 0;
}

我尝试将 m.lock(( 放在 while(( 循环中,并将 m.unlock(( 嵌套在 if(( 语句中,但它不起作用。目前,我的代码将只输出没有零的 file1 和没有连接零的 file2(不按任何特定顺序,因为无法预测哪个线程首先完成(。

我希望程序查看文本文件,检查当前行上的字符,如果是字母,则将其连接到字符串 s,如果是零,暂停此线程并让另一个线程检查它的行。

您需要确保两个线程同步运行,一次轮流读取一行。读取0时,跳过轮次,否则打印值。

为此,您可以使用:

  • 在工作线程之间共享的变量,用于跟踪轮次;
  • 一个条件变量,用于通知线程轮次更改;
  • 使条件变量工作的互斥锁。

下面是一个演示轮流方法的工作示例:

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
int main() {
std::mutex mtx;
std::condition_variable cond;
int turn = 0;
auto task = [&](int myturn, int turns) {
std::unique_lock<std::mutex> lock(mtx);
while (turn < 9) {
cond.wait(lock, [&] { return turn % turns == myturn; });
std::cout << "Task " << myturn << std::endl;
turn++;
cond.notify_all();
}
};
std::thread p1(task, 0, 2);
std::thread p2(task, 1, 2);
p1.join();
p2.join();
std::cout << "Done" << std::endl;
}

输出:

Task 0
Task 1
Task 0
Task 1
Task 0
Task 1
Task 0
Task 1
Task 0
Task 1
Done

考虑到字符串中每个字母必须去的索引位置是预先确定的,并且很容易从数据中计算出来。

读取第二个文件的线程:

0
h
0
0
i
s

知道它不对str[0]str[2]str[3]的角色负责,而是对str[1]str[4]str[5]负责。

如果我们添加一个互斥锁和一个条件变量,算法很简单。

index = 0
while reading a line from the file succeeds: {
if the line isn't "0": {
lock(mutex)
while length(str) < index: {
wait(condition, mutex)
}
assert(length(str) == index)
add line[0] to end of str
unlock(mutex)
broadcast(condition)
}
index++
}

基本上,对于线程需要写入的每个字符,它都知道索引。它首先等待字符串获得那么长的时间,其他线程将执行此操作。每当线程添加一个字符时,它都会广播条件变量,以唤醒另一个想要在新索引中放置字符的线程。

assert检查永远不应该关闭,除非数据是错误的(告诉两个或多个线程将字符放在同一个索引处(。 此外,如果所有线程都在同一索引处命中0行,当然,这将死锁;每个线程都将等待另一个线程在该索引处放置一个字符。

另一种解决方案是使用称为屏障的同步对象。这个问题非常适合障碍,因为我们拥有的是一组线程并行处理一些数据元组。对于每个元组,只有一个线程必须执行操作。

算法是这样的:

// initialization:
init(barrier, 2)  // number of threads

// each thread:
while able to read line from file: {
if line is not "0":
append line[0] to str
wait(barrier)
}

wait(barrier)所做的是延迟执行,直到 2 个线程调用它(因为我们将其初始化为 2(。 发生这种情况时,将释放所有线程。然后屏障在下一个wait重置,然后它将再次等待 2 个线程。

因此,执行是序列化的:线程在执行文件时以锁定步骤执行循环体。读取字符而不是0的线程将其添加到字符串中。其他线程不接触字符串;他们直接进入屏障等待,因此没有数据竞争。