java
1 /** 2 *cogroup与join算子不同的是如果rdd中的一个key,对应多个value,则返回,Iterable > 3 *@author Tele 4 */ 5 public class CogroupDemo { 6 private static SparkConf conf = new SparkConf().setMaster("local").setAppName("congroupdemo"); 7 private static JavaSparkContext jsc = new JavaSparkContext(conf); 8 public static void main(String[] args) { 9 //每个学生有多门成绩10 List > studentList = Arrays.asList(11 new Tuple2 (1,"tele"), 12 new Tuple2 (1,"xx"), 13 new Tuple2 (2,"yeye"), 14 new Tuple2 (3,"wyc")15 );16 17 List > scoreList = Arrays.asList(18 new Tuple2 (1,100),19 new Tuple2 (1,110),20 new Tuple2 (1,120),21 new Tuple2 (2,90),22 new Tuple2 (2,60),23 new Tuple2 (2,50),24 new Tuple2 (3,70),25 new Tuple2 (3,70)26 );27 28 JavaPairRDD studentRDD = jsc.parallelizePairs(studentList);29 JavaPairRDD scoreRDD = jsc.parallelizePairs(scoreList);30 31 JavaPairRDD , Iterable >> result = studentRDD.cogroup(scoreRDD);32 result.foreach(new VoidFunction ,Iterable >>>() {33 34 private static final long serialVersionUID = 1L;35 36 @Override37 public void call(Tuple2 , Iterable >> t) throws Exception {38 System.out.println("学号:" + t._1);39 System.out.println("姓名:" + t._2._1);40 System.out.println("成绩:" + t._2._2);41 42 /* System.out.print("成绩:[");43 t._2._2.forEach(i->System.out.print(i + ","));44 System.out.println("]");45 System.out.println("====================");*/46 47 }48 });49 50 jsc.close();51 }52 }
scala
1 object CogroupDemo { 2 def main(args: Array[String]): Unit = { 3 val conf = new SparkConf().setMaster("local").setAppName("cogroupdemo"); 4 val sc = new SparkContext(conf); 5 6 val studentArr = Array((1,"tele"),(2,"yeye"),(3,"wyc")); 7 val scoreArr = Array((1,100),(1,200),(2,80),(2,300),(3,100)); 8 9 val studentRDD = sc.parallelize(studentArr,1);10 val scoreRDD = sc.parallelize(scoreArr,1);11 12 val result = studentRDD.cogroup(scoreRDD);13 result.foreach(t=>{14 println("学号:" + t._1);15 println("姓名:" + t._2._1.mkString(" "));16 println("成绩:" + t._2._2.mkString(","));17 println("============");18 })19 }20 }