找到你要的答案

Q:RDD join : After joining two different pair RDDs, the resulted RDD key value and order has changed?

Q:RDD连接:连接两种不同的对睡眠呼吸障碍后,导致RDD关键价值和秩序发生了变化?

I have two pair RDDs let say

 RDD1 : [(1,a),(2,b),(3,c)]    
 RDD2 : [(1,d),(2,e),(3,f)]

Now am joining these RDDs using join

 RDD3 = RDD1.join(RDD2);

And i have displayed the elements in RDD3 with below code

 for(Tuple2<Integer,Tuple2<String,String>> tuple : RDD3.collect()) 
                      System.out.println(tuple._1()+":"+tuple._2()._1()+","+tuple._2()._2());

i have seen weird results like

5:b,e
4:a,d 
6:c,f

where as i expected like

1:a,d
1:b,e 
1:c,f

Is there any way to get a desired output like the above ? or else am interpreting RDD behavior wrongly ? Please suggest

Edit :

Actually i am reading data like this

JavaDoubleRDD data1 = sc.parallelizeDoubles(Arrays.asList(45.25,22.15,33.24));
JavaDoubleRDD data2 = sc.parallelizeDoubles(Arrays.asList(23.45,19.35,12.45));

and then

JavaPairRDD<Double,Double> lat1 = data1.cartesian(data1);
JavaRDD<Double> lat2 = lat1.map(new Function<Tuple2<Double,Double>,Double>() {
    @Override
    public Double call(Tuple2<Double,Double> t) {
        return Math.pow(t._1()-t._2(),2);
    }
});
 //flag and flag1 are static variables initially equal to 1
JavaPairRDD<Integer,Double> lat3 = lat2.mapToPair(new PairFunction<Double,Integer,Double>() {
    @Override
     public Tuple2<Integer,Double> call(Double d) {
        return new Tuple2<Integer,Double>(flag++,d);
    }
});
System.out.println("Latitude values display");  
    for(Tuple2<?,?> tuple : lat3.collect()) {
                  System.out.println(tuple._1()+":"+tuple._2());
    } 
JavaPairRDD<Double,Double> long1 = data2.cartesian(data2);
JavaRDD<Double> long2 = long1.map(new Function<Tuple2<Double,Double>,Double>() {
        @Override
        public Double call(Tuple2<Double,Double> t) {
                return Math.pow(t._1()-t._2(),2);
    }
});
    JavaPairRDD<Integer,Double> long3 = long2.mapToPair(new PairFunction<Double,Integer,Double>() {
        @Override
        public Tuple2<Integer,Double> call(Double d ) {
                return new Tuple2<Integer,Double>(flag1++,d);
        }
});
System.out.println("Longitude values display"); 
    for(Tuple2<?,?> tuple : long3.collect()) {
                  System.out.println(tuple._1()+":"+tuple._2());
    }
System.out.println("latitude and longitude values join");
JavaPairRDD<Integer,Tuple2<Double,Double>> weightmatrix1 = lat3.join(long3);
System.out.println("Weightmatrix1 Display");
    for(Tuple2<?,Tuple2<?,?>> tuple : weightmatrix1.collect()) {
                  System.out.println(tuple._1()+":"+tuple._2()._1()+","+tuple._2()._2());
    }   

So what am doing is calculating weight matrix based on latitude and longitude values

我有两对睡眠呼吸障碍让说

 RDD1 : [(1,a),(2,b),(3,c)]    
 RDD2 : [(1,d),(2,e),(3,f)]

现在我把这些RDDS使用联接

 RDD3 = RDD1.join(RDD2);

我有rdd3元素显示在下面的代码

 for(Tuple2<Integer,Tuple2<String,String>> tuple : RDD3.collect()) 
                      System.out.println(tuple._1()+":"+tuple._2()._1()+","+tuple._2()._2());

我看到了奇怪的结果

5:b,e
4:a,d 
6:c,f

如我所期望的那样

1:a,d
1:b,e 
1:c,f

Is there any way to get a desired output like the above ? or else am interpreting RDD behavior wrongly ? Please suggest

编辑:

其实我在读这样的数据

JavaDoubleRDD data1 = sc.parallelizeDoubles(Arrays.asList(45.25,22.15,33.24));
JavaDoubleRDD data2 = sc.parallelizeDoubles(Arrays.asList(23.45,19.35,12.45));

然后

JavaPairRDD<Double,Double> lat1 = data1.cartesian(data1);
JavaRDD<Double> lat2 = lat1.map(new Function<Tuple2<Double,Double>,Double>() {
    @Override
    public Double call(Tuple2<Double,Double> t) {
        return Math.pow(t._1()-t._2(),2);
    }
});
 //flag and flag1 are static variables initially equal to 1
JavaPairRDD<Integer,Double> lat3 = lat2.mapToPair(new PairFunction<Double,Integer,Double>() {
    @Override
     public Tuple2<Integer,Double> call(Double d) {
        return new Tuple2<Integer,Double>(flag++,d);
    }
});
System.out.println("Latitude values display");  
    for(Tuple2<?,?> tuple : lat3.collect()) {
                  System.out.println(tuple._1()+":"+tuple._2());
    } 
JavaPairRDD<Double,Double> long1 = data2.cartesian(data2);
JavaRDD<Double> long2 = long1.map(new Function<Tuple2<Double,Double>,Double>() {
        @Override
        public Double call(Tuple2<Double,Double> t) {
                return Math.pow(t._1()-t._2(),2);
    }
});
    JavaPairRDD<Integer,Double> long3 = long2.mapToPair(new PairFunction<Double,Integer,Double>() {
        @Override
        public Tuple2<Integer,Double> call(Double d ) {
                return new Tuple2<Integer,Double>(flag1++,d);
        }
});
System.out.println("Longitude values display"); 
    for(Tuple2<?,?> tuple : long3.collect()) {
                  System.out.println(tuple._1()+":"+tuple._2());
    }
System.out.println("latitude and longitude values join");
JavaPairRDD<Integer,Tuple2<Double,Double>> weightmatrix1 = lat3.join(long3);
System.out.println("Weightmatrix1 Display");
    for(Tuple2<?,Tuple2<?,?>> tuple : weightmatrix1.collect()) {
                  System.out.println(tuple._1()+":"+tuple._2()._1()+","+tuple._2()._2());
    }   

所以我所做的是根据经纬度计算权重矩阵

answer1: 回答1:

When I do:

scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
scala> val rdd2 = sc.parallelize(Array((1,"d"),(2,"e"),(3,"f")))
scala> val rdd3 = rdd1.join(rdd2)
scala> rdd3.toArray.foreach(println(_))

I consistently get:

(1,(a,d))
(2,(b,e))
(3,(c,f))

当我做:

scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
scala> val rdd2 = sc.parallelize(Array((1,"d"),(2,"e"),(3,"f")))
scala> val rdd3 = rdd1.join(rdd2)
scala> rdd3.toArray.foreach(println(_))

我一贯得到:

(1,(a,d))
(2,(b,e))
(3,(c,f))
java  join  apache-spark  rdd