亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將Java中的地圖列表轉換為spark中的數據集

將Java中的地圖列表轉換為spark中的數據集

德瑪西亞99 2023-05-17 15:56:48
我有一個 Java 中的 Map 列表,基本上代表行。List<Map<String, Object>> dataList = new ArrayList<>();Map<String, Object> row1 = new HashMap<>();row1.put("fund", "f1");row1.put("broker", "b1");row1.put("qty", 100);Map<String, Object> row2 = new HashMap<>();row2.put("fund", "f2");row2.put("broker", "b2");row2.put("qty", 200);dataList.add(row1);dataList.add(row2);我正在嘗試從中創建一個 Spark DataFrame。我試圖將其轉換為JavaRDD<Map<String, Object>>使用JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);但我不確定如何從這里轉到Dataset<Row>. 我看過 Scala 示例,但沒有看過 Java 示例。我還嘗試將列表轉換為 JSON 字符串,并讀取 JSON 字符串。String jsonStr = mapper.writeValueAsString(dataList);但似乎我必須將它寫入文件然后使用讀取Dataset<Row> df = spark.read().json(pathToFile);如果可能的話,我寧愿在內存中進行,而不是寫入文件并從那里讀取。SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")            .set("spark.sql.shuffle.partitions", "1");JavaSparkContext sc = new JavaSparkContext(sparkConf);    SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();List<Map<String, Object>> dataList = new ArrayList<>();Map<String, Object> row1 = new HashMap<>();row1.put("fund", "f1");row1.put("broker", "b1");row1.put("qty", 100);Map<String, Object> row2 = new HashMap<>();row2.put("fund", "f2");row2.put("broker", "b2");row2.put("qty", 200);dataList.add(row1);dataList.add(row2);ObjectMapper mapper = new ObjectMapper();    String jsonStr = mapper.writeValueAsString(dataList);JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);data.show();
查看完整描述

4 回答

?
胡說叔叔

TA貢獻1804條經驗 獲得超8個贊

您根本不需要使用 RDD。您需要做的是從地圖列表中提取所需的架構,將地圖列表轉換為行列表,然后使用spark.createDataFrame.


在 Java 中,這有點痛苦,尤其是在創建Row對象時,但它是這樣進行的:


List<String> cols = new ArrayList(dataList.get(0).keySet());

List<Row> rows = dataList

    .stream()

    .map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))

    .map(row -> row.collect(Collectors.toList()))

    .map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())

    .map(Row$.MODULE$::fromSeq)

    .collect(Collectors.toList());


StructType schema = new StructType(

    cols.stream()

        .map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))

        .collect(Collectors.toList())

        .toArray(new StructField[0])

);

Dataset<Row> result = spark.createDataFrame(rows, schema);


查看完整回答
反對 回復 2023-05-17
?
慕桂英546537

TA貢獻1848條經驗 獲得超10個贊

spark 文檔已經指出了如何加載內存中的 json 字符串。


這是來自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例


// Alternatively, a DataFrame can be created for a JSON dataset represented by

// a Dataset<String> storing one JSON object per string.

List<String> jsonData = Arrays.asList(

        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");

Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());

Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);

anotherPeople.show();

// +---------------+----+

// |        address|name|

// +---------------+----+

// |[Columbus,Ohio]| Yin|

// +---------------+----+


查看完整回答
反對 回復 2023-05-17
?
慕標5832272

TA貢獻1966條經驗 獲得超4個贊

public class MyRow implements Serializable {


  private String fund;

  private String broker;

  private int qty;


  public MyRow(String fund, String broker, int qty) {

    super();

    this.fund = fund;

    this.broker = broker;

    this.qty = qty;

  }


  public String getFund() {

    return fund;

  }


  public void setFund(String fund) {

    this.fund = fund;

  }



  public String getBroker() {

    return broker;

  }


  public void setBroker(String broker) {

    this.broker = broker;

  }


  public int getQty() {

    return qty;

  }


  public void setQty(int qty) {

    this.qty = qty;

  }


}

現在創建一個 ArrayList。此列表中的每個項目都將充當最終數據框中的行。


MyRow r1 = new MyRow("f1", "b1", 100);

MyRow r2 = new MyRow("f2", "b2", 200);

List<MyRow> dataList = new ArrayList<>();

dataList.add(r1);

dataList.add(r2);

現在我們必須將此列表轉換為數據集 -


Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);

ds.show()


查看完整回答
反對 回復 2023-05-17
?
慕姐4208626

TA貢獻1852條經驗 獲得超7個贊

import org.apache.spark.api.java.function.Function;

private static JavaRDD<Map<String, Object>> rows;

private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));

public void test(){

    rows = sc.parallelize(list);

    JavaRDD<Row> rowRDD = rows.map(f);

    Map<String, Object> headMap = list.get(0);

    TreeMap<String, Object> headerMap = new TreeMap<>(headMap);

    List<StructField> fields = new ArrayList<>();

    StructField field;

    for (String key : headerMap.keySet()) {

        System.out.println("key:::"+key);

        Object value = list.get(0).get(key);

        if (value instanceof Integer) {

            field = DataTypes.createStructField(key, DataTypes.IntegerType, true);

        }

        else if (value instanceof Double) {

            field = DataTypes.createStructField(key, DataTypes.DoubleType, true);

        }

        else if (value instanceof Date || value instanceof java.util.Date) {

            field = DataTypes.createStructField(key, DataTypes.DateType, true);

        }

        else {

            field = DataTypes.createStructField(key, DataTypes.StringType, true);

        }

            fields.add(field);

    }

    StructType struct = DataTypes.createStructType(fields);

    Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);

}


查看完整回答
反對 回復 2023-05-17
  • 4 回答
  • 0 關注
  • 380 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號