使稀疏矩阵快速相乘

Make Sparse Matrix Multiply Fast

本文关键字:      更新时间:2023-10-16

代码是使用C++11编写的。每个进程得到两个矩阵数据(稀疏)。测试数据可以从此处输入链接描述下载

测试数据包含2个文件:a0(稀疏矩阵0)和a1(稀疏矩阵1)。文件中的每一行都是"i j v",表示稀疏矩阵第i行、第j列的值为v。i、j、v都是整数。

使用c++11无序映射作为稀疏矩阵的数据结构。

unordered_map<int, unordered_map<int, double> > matrix1 ;
matrix1[i][j] = v ; //means at row i column j of matrix1 is value v;

以下代码耗时约2分钟。编译命令为g++ -O2 -std=c++11 ./matmult.cpp

g++版本是4.8.1,Opensuse 13.1。我的电脑信息:英特尔(R)酷睿(TM)i5-4200U CPU@1.60GHz,4G内存。

#include <iostream>
#include <fstream>
#include <unordered_map>
#include <vector>
#include <thread>
using namespace std;
void load(string fn, unordered_map<int,unordered_map<int, double> > &m) {
  ifstream input ;
  input.open(fn);
  int i, j ; double v;
  while (input >> i >> j >> v)  {
    m[i][j] = v;
  }
}
unordered_map<int,unordered_map<int, double> > m1;
unordered_map<int,unordered_map<int, double> > m2;
//vector<vector<int> > keys(BLK_SIZE);
int main() {
  load("./a0",m1);
  load("./a1",m2);
  for (auto r1 : m1) {
    for (auto r2 : m2) {
      double sim = 0.0 ;
      for (auto c1 : r1.second) {
        auto f = r2.second.find(c1.first);
        if (f != r2.second.end()) {
           sim += (f->second) * (c1.second) ;
        }
      }
   }
  }
  return 0;
}

上面的代码太慢。我怎样才能让它跑得更快?我使用多线程。新代码如下,编译命令为g++ -O2 -std=c++11 -pthread ./test.cpp。大约花了1分钟我希望它更快

如何使任务更快谢谢!

#include <iostream>
#include <fstream>
#include <unordered_map>
#include <vector>
#include <thread>
#define BLK_SIZE 8
using namespace std;
void load(string fn, unordered_map<int,unordered_map<int, double> > &m) {
  ifstream input ;
  input.open(fn);
  int i, j ; double v;
  while (input >> i >> j >> v)  {
    m[i][j] = v;
  }
}
unordered_map<int,unordered_map<int, double> > m1;
unordered_map<int,unordered_map<int, double> > m2;
vector<vector<int> > keys(BLK_SIZE);
void thread_sim(int blk_id) {
  for (auto row1_id : keys[blk_id]) {
    auto r1 = m1[row1_id];
    for (auto r2p : m2) {
      double sim = 0.0;
      for (auto col1 : r1) {
        auto f = r2p.second.find(col1.first);
        if (f != r2p.second.end()) {
          sim += (f->second) * col1.second ;
        }
      }
    }
  }
}
int main() {
  load("./a0",m1);
  load("./a1",m2);
  int df = BLK_SIZE - (m1.size() % BLK_SIZE);
  int blk_rows = (m1.size() + df) / (BLK_SIZE - 1);
  int curr_thread_id  = 0;
  int index = 0;
  for (auto k : m1) {
    keys[curr_thread_id].push_back(k.first);
    index++;
    if (index==blk_rows) {
      index = 0;
      curr_thread_id++;
    }
  }
  cout << "ok" << endl;
  std::thread t[BLK_SIZE];
  for (int i = 0 ; i < BLK_SIZE ; ++i){
    t[i] = std::thread(thread_sim,i);
  }
  for (int i = 0; i< BLK_SIZE; ++i)
    t[i].join();
  return 0 ;
}

大多数情况下,在处理稀疏矩阵时,使用的表示比嵌套映射更有效。典型的选择是压缩稀疏行(CSR)或压缩稀疏列(CSC)。看见https://en.wikipedia.org/wiki/Sparse_matrix详细信息。

您还没有指定示例运行的时间或希望运行的平台。这些是本示例中重要的设计限制。

我可以想到几个方面来提高效率:-

  1. 改进数据存储方式
  2. 改进多线程
  3. 改进算法

第一点是针对系统存储稀疏阵列的方式和能够读取数据的接口。当速度不重要,但可能有更具体的数据结构可用于解决此问题时,嵌套的无序映射是一个不错的选择。往好了说,你可能会找到一个库,它提供了比嵌套映射更好的数据存储方式,往坏了说,也许你必须自己想出一些东西。

第二点是指该语言支持多线程的方式。多线程系统的原始规范是独立于平台的,可能会错过一些系统可能具有的方便功能。决定要针对的系统并使用操作系统的线程系统。您可以更好地控制线程的工作方式,可能会减少开销,但会失去对跨平台的支持。

第三点需要做一些工作。考虑到数据的性质,你乘以矩阵的方法真的是最有效的方法吗。我不是这些方面的专家,但这是需要考虑的,但需要付出一些努力。

最后,您可以对运行的平台非常具体,并进入汇编编程的世界。现代CPU是复杂的野兽。它们有时可以并行执行操作。例如,您可以执行SIMD运算或并行整数和浮点运算。要做到这一点,确实需要深入了解正在发生的事情,而且有一些有用的工具可以帮助你。英特尔确实有一个名为VTune的工具(现在可能是其他工具),可以分析代码并突出潜在的瓶颈。最终,您将希望通过为CPU找到其他事情或改进算法(或两者兼而有之)来消除算法中CPU空闲等待发生某些事情(如等待RAM中的数据)的区域。

最终,为了提高整体速度,你需要知道是什么减缓了速度。这通常意味着知道如何分析代码并理解结果。探查器是这方面的通用工具,但也有特定于平台的工具可用。

我知道这不是你想要的,但快速编写代码真的很难,而且非常耗时。

相关文章:
  • 没有找到相关文章