找到你要的答案

Q:Loading bigger than memory hdf5 file in pyspark

Q:加载比记忆中pyspark HDF5文件

I have a big file (say 20 Gb) stored in HDF5 format. The file is basically a set of 3D coordinates that evolve over time (a molecular simulation trajectory). This basically is an array of shape (8000 (frames), 50000 (particles), 3 (coordinates))

In regular python I would simply load the hdf5 datafile using for h5py or pytables and index the datafile like if it was a numpy (the library lazily loads whatever data it needs).

However, if I try to load this file in Spark using SparkContext.parallelize it obviously clogs the memory:

sc.parallelize(data, 10)

How can I handle this problem? Is there a preferred data format for huge arrays? Can I make the rdd to be written on disk without passing by memory?

我有一个大的文件(20 GB)存储在HDF5格式。该文件基本上是一组随时间演化的三维坐标(分子模拟轨迹)。这基本上是一个数组的形状(8000(帧),50000(粒子),3(坐标)

在常规的Python我只会用h5py或pytables和索引数据文件一样,如果是NumPy HDF5数据文件加载(图书馆懒洋洋地加载任何数据需要)。

然而,如果我尝试加载这个文件在火花使用sparkcontext.parallelize显然木屐的记忆:

sc.parallelize(data, 10)

我如何处理这个问题?是否有一个首选的数据格式为巨大的数组?我可以被写入磁盘的RDD没有经过记忆?

answer1: 回答1:

Spark (and Hadoop) doesn't have support for reading parts of the HDF5 binary files. (I suspect that the reason for this is that HDF5 is a container format for storing documents and it allows to specify tree like hierarchy for the documents).

But if you need to read file from the local disk - it is doable with Spark especially if you know internal structure of your HDF5 file.

Here is an example - it assumes that you'll run local spark job, and you know in advance that your HDF5 dataset '/mydata' consists out of 100 chunks.

h5file_path="/absolute/path/to/file"

def readchunk(v):
    empty = h5.File(h5file_path)
    return empty['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()

Going further you can modify the program to detect the number of chunks using f5['/mydata'].shape[0]

The next step would be to iterate over multiple datasets (you can list data sets with f5.keys()).

Also there is another article "From HDF5 Datasets to Apache Spark RDDs" that describe similar approach.

The same approach would work on a distributed cluster, but it gets little inefficient. h5py requires the file to in on a local file system. So this can be achieved in several ways: copy the file to all workers and keep it under the same location on worker's disk; or put the file to HDFS and mount HDFS using fusefs - so workers could access the file. Both ways have some inefficiencies, but it should be good enough for ad-hoc tasks.

Here is optimized version that opens h5 only once on every executor:

h5file_path="/absolute/path/to/file"

_h5file = None    
def readchunk(v):
    # code below will be executed on executor - in another python process on remote server
    # original value for _h5file (None) is sent from driver
    # and on executor is updated to h5.File object when the `readchunk` is called for the first time
    global _h5file
    if _h5file is None:
         _h5file = h5.File(h5file_path)
    return _h5file['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()

火花(Hadoop)没有阅读的HDF5二进制文件的部分支持。(我怀疑这是因为HDF5是存储文件的容器格式,它允许指定树状层次结构的文件)。

但是如果你需要从本地磁盘读取文件是可行的火花如果你知道你的HDF5文件的内部结构。

这里有一个例子:假设你运行本地火花的工作,和你提前知道你的HDF5数据集/ MYDATA”包括了100块。

h5file_path="/absolute/path/to/file"

def readchunk(v):
    empty = h5.File(h5file_path)
    return empty['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()

进一步可以修改程序检测使用F5 [ / ]块包含数形[ 0 ]。

下一步将遍历多个数据集(你可以用F5。keys()数据集列表)。

也有另一篇文章“从HDF5数据Apache Spark RDDS”描述相似的方法。

同样的方法在分布式集群上工作,但效率低下。h5py需要的文件在本地文件系统上。所以,这几种方法可以实现:将文件复制到所有的工人和保持对工人的磁盘相同位置下;或把文件HDFS和安装使用fusefs HDFS所以工人可以访问文件。这两种方法都有一些效率低下,但它应该是足够好的特设任务。

这是优化的版本,只有一次执行打开H5:

h5file_path="/absolute/path/to/file"

_h5file = None    
def readchunk(v):
    # code below will be executed on executor - in another python process on remote server
    # original value for _h5file (None) is sent from driver
    # and on executor is updated to h5.File object when the `readchunk` is called for the first time
    global _h5file
    if _h5file is None:
         _h5file = h5.File(h5file_path)
    return _h5file['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
python  apache-spark  hdf5  pyspark