SparkSQL在日常的数据开发过程中占据着重要的地位,面对日益复杂的需求,需要建立复杂的数据结构,在将嵌套型JavaBean和复杂数据结构如Map等注册为table,以支持化腐朽为神奇,将复杂的数据结构转化为天下大同的sql语句,使得spark更加亲民。废话不多说,还是以代码实测敬上。
前戏走起,首先创建一个简单的JavaBean
public static class Point implements Serializable { private double x; private double y; public Point(double x, double y) { this.x = x; this.y = y;
} public Point() {
} public double getX() { return x;
} public void setX(double x) { this.x = x;
} public double getY() { return y;
} public void setY(double y) { this.y = y;
}
}创建嵌套型的JavaBean
public static class Segment implements Serializable { private Point from; private Point to; public Segment(Point from, Point to) { this.from = from; this.to = to;
} public Segment() {
} public Point getFrom() { return from;
} public void setFrom(Point from) { this.from = from;
} public Point getTo() { return to;
} public void setTo(Point to) { this.to = to;
}
}创建数组类型的JavaBean
public static class Line implements Serializable { private String name; private Point[] points; public Line(String name, Point[] points) { this.name = name; this.points = points;
} public Line() {
} public String getName() { return name;
} public void setName(String name) { this.name = name;
} public Point[] getPoints() { return points;
} public void setPoints(Point[] points) { this.points = points;
}
}创建复杂数据类型的JavaBean
public static class NamedPoints implements Serializable { private String name; private Map<String, Point> pointMap; public NamedPoints() {
} public NamedPoints(String name, Map<String, Point> pointMap) { this.name = name; this.pointMap = pointMap;
} public String getName() { return name;
} public void setName(String name) { this.name = name;
} public Map<String, Point> getPointMap() { return pointMap;
} public void setPointMap(Map<String, Point> pointMap) { this.pointMap = pointMap;
}
}前戏足矣,开始水到渠成。
public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("complex javabean to table")
.getOrCreate();
List<Segment> segments = Arrays.asList(new Segment(new Point(1.0, 2.0), new Point(3.0, 4.0)), new Segment(new Point(5.0, 6.0), new Point(7.0, 8.0)), new Segment(new Point(9.0, 10.0), new Point(11.0, 12.0)));
Encoder<Segment> encoder = Encoders.bean(Segment.class);
Dataset<Segment> dataset = session.createDataset(segments, encoder);
dataset.registerTempTable("segment_table");
Dataset<Row> sql = session.sql("select t.from.x from segment_table t");
sql.printSchema();
sql.show();
System.out.println("测试map");
Encoder<NamedPoints> namedPointsEncoder = Encoders.bean(NamedPoints.class);
Map<String,Point> map = new HashMap<>();
map.put("p1",new Point(0.0,1.1));
Map<String,Point> pointMap = new HashMap<>();
pointMap.put("p2",new Point(2.2,2.3));
pointMap.put("p3",new Point(2.3,2.5));
pointMap.put("p4",new Point(2.4,2.6));
List<NamedPoints> namedPoints = Arrays.asList(new NamedPoints("name_1", map), new NamedPoints("name_2", pointMap));
Dataset<NamedPoints> namedPointsDataset = session.createDataset(namedPoints, namedPointsEncoder);
System.out.println("===============================");
namedPointsDataset.show();
System.out.println("==============================="); try {
namedPointsDataset.createTempView("name_point");
Dataset<Row> rowDataset = session.sql("select * from name_point");
rowDataset.printSchema();
rowDataset.show();
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++");
Dataset<Row> rowDatasetDetail = session.sql("select t.name,t.pointMap,t.pointMap.key,t.pointMap.value.x from name_point t");
rowDatasetDetail.printSchema();
rowDatasetDetail.show();
System.out.println("++++++++++++++++++++++OVER+++++++++++++++++++++++++");
} catch (AnalysisException e) {
e.printStackTrace();
}
session.stop();
}直接看结果吧
18/11/16 10:42:43 INFO CodeGenerator: Code generated in 220.044447 ms root |-- x: double (nullable = true) 18/11/16 10:42:44 INFO CodeGenerator: Code generated in 8.54563 ms 18/11/16 10:42:44 INFO CodeGenerator: Code generated in 6.818043 ms +---+ | x| +---+ |1.0| |5.0| |9.0| +---+ 测试map 18/11/16 10:42:44 INFO ContextCleaner: Cleaned accumulator 0 18/11/16 10:42:44 INFO CodeGenerator: Code generated in 56.920135 ms =============================== 18/11/16 10:42:44 INFO CodeGenerator: Code generated in 7.050247 ms 18/11/16 10:42:44 INFO CodeGenerator: Code generated in 6.498453 ms +------+--------------------+ | name| pointMap| +------+--------------------+ |name_1| [p1 -> [0.0, 1.1]]| |name_2|[p2 -> [2.2, 2.3]...| +------+--------------------+ =============================== root |-- name: string (nullable = true) |-- pointMap: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- x: double (nullable = false) | | |-- y: double (nullable = false) +------+--------------------+ | name| pointMap| +------+--------------------+ |name_1| [p1 -> [0.0, 1.1]]| |name_2|[p2 -> [2.2, 2.3]...| +------+--------------------+ +++++++++++++++++++++++++++++++++++++++++++++++ root |-- name: string (nullable = true) |-- pointMap: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- x: double (nullable = false) | | |-- y: double (nullable = false) |-- key: struct (nullable = true) | |-- x: double (nullable = false) | |-- y: double (nullable = false) |-- x: double (nullable = true) 18/11/16 10:42:44 INFO CodeGenerator: Code generated in 7.324709 ms 18/11/16 10:42:45 INFO CodeGenerator: Code generated in 8.556297 ms +------+--------------------+----+----+ | name| pointMap| key| x| +------+--------------------+----+----+ |name_1| [p1 -> [0.0, 1.1]]|null|null| |name_2|[p2 -> [2.2, 2.3]...|null|null| +------+--------------------+----+----+ ++++++++++++++++++++++OVER+++++++++++++++++++++++++
从运行结果来看,对嵌套型JavaBean的取值,可以直接以.的形式取值。
对map类型来说,可以取出map数据结构的整体结果,但是具体的key-value取值显示都为null
或许是我的写法有问题,或许是sparkSQL在这一块还不太完善,希望能够给与建议。第一次写博客,轻拍。。。
作者:gaogf
链接:https://www.jianshu.com/p/4cd45d57183c
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦