找到你要的答案

Q:Apache Spark fails to process a large Cassandra column family

Q:Apache的火花不能处理大卡桑德拉列族

I am trying to use Apache Spark to process my large (~230k entries) cassandra dataset, but I am constantly running into different kinds of errors. However I can successfully run applications when running on a dataset ~200 entries. I have a spark setup of 3 nodes with 1 master and 2 workers, and the 2 workers also have a cassandra cluster installed with data indexed with a replication factor of 2. My 2 spark workers show 2.4 and 2.8 GB memory on the web interface and I set spark.executor.memory to 2409 when running an application, to get a combined memory of 4.7 GB. Here is my WebUI Homepage

The environment page of one of the tasks

At this stage, I am simply trying to process data stored in cassandra using spark. Here is the basic code I am using to do this in Java

SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);

CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);

System.out.println(rdd.count());

For a successful run, on a small dataset (200 entries), the events interface looks something like this

But when I run the same thing on a large dataset (i.e. I change only the CASSANDRA_COLUMN_FAMILY), the job never terminates inside the terminal, the log looks like this

and after ~2 minutes, the stderr for the executors looks like this

and after ~7 minutes, I get

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

in my terminal, and I have to manually kill the SparkSubmit process. However, the large dataset was indexed from a binary file that occupied only 22 MB, and doing nodetool status, I can see that only ~115 MB data is stored in both of my cassandra nodes. I have also tried to use Spark SQL on my dataset, but have got similar results with that too. Where am I going wrong with my setup, and what should I do to successfully process my dataset, for both a Transformation-Action program and a program that uses Spark SQL.

I have already tried the following methods

  • Using -Xms1G -Xmx1G to increase memory, but the program fails with an exception saying that I should instead set spark.executor.memory, which I have.

  • Using spark.cassandra.input.split.size, which fails saying it isn't a valid option, and a similar option is spark.cassandra.input.split.size_in_mb, which I set to 1, with no effect.

EDIT

based on this answer, I have also tried the following methods:

  • set spark.storage.memoryFraction to 0

  • not set spark.storage.memoryFraction to zero and use persist with MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK and MEMORY_AND_DISK_SER.

Versions:

  • Spark: 1.4.0

  • Cassandra: 2.1.6

  • spark-cassandra-connector: 1.4.0-M1

我试图用Apache Spark处理我的大(~ 230k条目)Cassandra的数据集,但是我经常遇到各种不同类型的错误。但是,当在DataSet 200上运行时,我可以成功运行应用程序。我有一个3节点1和2工人掌握火花的设置,和2名工人也有卡桑德拉集群安装有2个复制因子的数据索引。我的2个火花工人显示2.4和2.8 GB的存储在Web界面和我spark.executor.memory 2409运行应用程序时,得到一个4.7 GB的内存组合。这是我的WebUI主页

任务的环境页

在这个阶段,我只是试图用火花在卡桑德拉存储过程数据。这是基本的代码我用java做这个

SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);

CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);

System.out.println(rdd.count());

对于一个成功的运行,在一个小数据集(200个条目),事件界面看起来像这样

但当我运行同样的事情在一个大的数据集(即我只改变cassandra_column_family),工作永远不会终止,在终端,日志看起来像这样

~后2分钟,对执行器的内部看起来像这样

7分钟后,我得到

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

在我的终端,我必须手动杀死sparksubmit过程。然而,大数据集的索引从一个二进制文件,只占用22 MB,和做nodetool状态,我可以看到,只有~ 115 MB的数据存储在我的两Cassandra节点。我也试过用火花SQL我的数据集,但都得到了类似的结果,太。我在哪里去了我的设置错误,也成功地处理我的数据,我要什么,既是一个转型行动计划和程序,用火花的SQL。

我已经尝试了以下方法

  • 使用xms1g - xmx1g增加内存,但程序不能有例外说我应该把spark.executor.memory,我有。

  • 使用spark.cassandra.input.split.size,这不能说不是一个有效的选择,和一个类似的选项spark.cassandra.input.split.size_in_mb,我设置为1,无影响。

编辑

基于这个答案,我也尝试了以下方法:

  • 0集spark.storage.memoryfraction

  • 不spark.storage.memoryfraction零和使用坚持memory_only,memory_only_ser,memory_and_disk和memory_and_disk_ser。

版本:

  • 火花:1.4.0

  • 卡桑德拉:2.1.6

  • 火花卡桑德拉连接器:1.4.0-m1

answer1: 回答1:

I think there is a issue in the latest spark-cassandra-connector. The parameter spark.cassandra.input.split.size_in_mb is supposed to have a default value of 64 MB which is being interpreted as 64 bytes in the code. This causes too many partitions to be created, which can't be scheduled by spark. Try setting the conf value to

spark.cassandra.input.split.size_in_mb=67108864

我认为这是最新的火花卡桑德拉连接器的问题。参数spark.cassandra.input.split.size_in_mb应该具有64 MB被解释为64个字节的代码默认值。这会导致太多的分区被创建,不能被火花安排。尝试设置配置值

spark.cassandra.input.split.size_in_mb=67108864
java  cassandra  apache-spark  apache-spark-sql  spark-cassandra-connector