找到你要的答案

Q:How to use RDD persist and cache?

Q:如何使用RDD坚持和缓存?

Please tell me how to use RDD methods Persist() and Cache(), it seems for a conventional program which i usually write in java, Say for sparkStreaming, which is a continues execution of DAG, where every time the value of RDD will get updated and hence perist/cache will also be called again & again and will result in overwriting that RDD.

But as the documentation shows below, it seems like these methods are useful for the interactive shells only or is there any way i can use the cached/persist RDD in my sequential program more efficiently in comparison to merely storing the desired RDD in any reference variable.

Spark Doc Link

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

VS

In a sequential spark-streaming job i think this is very same and will not be evaluated again and again.

JavaRDD sortedRDD =baseRDD.filter(f(x));

sortedRDD.count();
sortedRDD.saveAsNewHadoopAPIFile();
// Or Anything we want !   

I would be grateful if you can help in resolving this doubt. Thanks !

请告诉我如何使用RDD方法persist()和cache(),看来传统的节目,我通常写java,sparkstreaming说,这是一个继续执行DAG,每次RDD的价值将得到更新,因此坚持/缓存也将被再次调用&;再将结果在重写,RDD。

但是当文件显示在下面,好像这些方法是有用的交互式shell或有什么办法我可以使用缓存/坚持我的顺序程序在RDD仅仅存储所需的RDD在任何引用变量比较更有效。

火花文档的链接

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

VS

在一个连续的火花流的工作,我认为这是非常相同的,不会被一次又一次的评估。

JavaRDD sortedRDD =baseRDD.filter(f(x));

sortedRDD.count();
sortedRDD.saveAsNewHadoopAPIFile();
// Or Anything we want !   

I would be grateful if you can help in resolving this doubt. Thanks !

answer1: 回答1:

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use. You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).

val linesWithSpark  = sc.textFile("/home/kishore/test.txt")
linesWithSpark.cache()
linesWithSpark.count()

It does nothing. RDD.cache is also a lazy operation. The file is still not read. But now the RDD says "read this file and then cache the contents". If you then run linesWithSpark.count the first time, the file will be loaded, cached, and counted. If you call linesWithSpark.count a second time, the operation will use the cache. It will just take the data from the cache and count the lines.

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use. You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).

val linesWithSpark  = sc.textFile("/home/kishore/test.txt")
linesWithSpark.cache()
linesWithSpark.count()

它什么也不做。rdd.cache也是个懒惰的操作。文件仍不读取。但现在的RDD说“读此文件,然后缓存的内容”。如果你再跑lineswithspark.count第一时间,文件将被加载,缓存,并计数。如果你打电话给lineswithspark.count一次,操作将使用缓存。它只会从缓存中提取数据并计算行数。

java  apache-spark  bigdata  spark-streaming