为了账号安全,请及时绑定邮箱和手机立即绑定

将Java中的地图列表转换为spark中的数据集

将Java中的地图列表转换为spark中的数据集

德玛西亚99 2023-05-17 15:56:48
我有一个 Java 中的 Map 列表,基本上代表行。List<Map<String, Object>> dataList = new ArrayList<>();Map<String, Object> row1 = new HashMap<>();row1.put("fund", "f1");row1.put("broker", "b1");row1.put("qty", 100);Map<String, Object> row2 = new HashMap<>();row2.put("fund", "f2");row2.put("broker", "b2");row2.put("qty", 200);dataList.add(row1);dataList.add(row2);我正在尝试从中创建一个 Spark DataFrame。我试图将其转换为JavaRDD<Map<String, Object>>使用JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);但我不确定如何从这里转到Dataset<Row>. 我看过 Scala 示例,但没有看过 Java 示例。我还尝试将列表转换为 JSON 字符串,并读取 JSON 字符串。String jsonStr = mapper.writeValueAsString(dataList);但似乎我必须将它写入文件然后使用读取Dataset<Row> df = spark.read().json(pathToFile);如果可能的话,我宁愿在内存中进行,而不是写入文件并从那里读取。SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")            .set("spark.sql.shuffle.partitions", "1");JavaSparkContext sc = new JavaSparkContext(sparkConf);    SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();List<Map<String, Object>> dataList = new ArrayList<>();Map<String, Object> row1 = new HashMap<>();row1.put("fund", "f1");row1.put("broker", "b1");row1.put("qty", 100);Map<String, Object> row2 = new HashMap<>();row2.put("fund", "f2");row2.put("broker", "b2");row2.put("qty", 200);dataList.add(row1);dataList.add(row2);ObjectMapper mapper = new ObjectMapper();    String jsonStr = mapper.writeValueAsString(dataList);JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);data.show();
查看完整描述

4 回答

?
胡说叔叔

TA贡献1804条经验 获得超8个赞

您根本不需要使用 RDD。您需要做的是从地图列表中提取所需的架构,将地图列表转换为行列表,然后使用spark.createDataFrame.


在 Java 中,这有点痛苦,尤其是在创建Row对象时,但它是这样进行的:


List<String> cols = new ArrayList(dataList.get(0).keySet());

List<Row> rows = dataList

    .stream()

    .map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))

    .map(row -> row.collect(Collectors.toList()))

    .map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())

    .map(Row$.MODULE$::fromSeq)

    .collect(Collectors.toList());


StructType schema = new StructType(

    cols.stream()

        .map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))

        .collect(Collectors.toList())

        .toArray(new StructField[0])

);

Dataset<Row> result = spark.createDataFrame(rows, schema);


查看完整回答
反对 回复 2023-05-17
?
慕桂英546537

TA贡献1848条经验 获得超10个赞

spark 文档已经指出了如何加载内存中的 json 字符串。


这是来自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例


// Alternatively, a DataFrame can be created for a JSON dataset represented by

// a Dataset<String> storing one JSON object per string.

List<String> jsonData = Arrays.asList(

        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");

Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());

Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);

anotherPeople.show();

// +---------------+----+

// |        address|name|

// +---------------+----+

// |[Columbus,Ohio]| Yin|

// +---------------+----+


查看完整回答
反对 回复 2023-05-17
?
慕标5832272

TA贡献1966条经验 获得超4个赞

public class MyRow implements Serializable {


  private String fund;

  private String broker;

  private int qty;


  public MyRow(String fund, String broker, int qty) {

    super();

    this.fund = fund;

    this.broker = broker;

    this.qty = qty;

  }


  public String getFund() {

    return fund;

  }


  public void setFund(String fund) {

    this.fund = fund;

  }



  public String getBroker() {

    return broker;

  }


  public void setBroker(String broker) {

    this.broker = broker;

  }


  public int getQty() {

    return qty;

  }


  public void setQty(int qty) {

    this.qty = qty;

  }


}

现在创建一个 ArrayList。此列表中的每个项目都将充当最终数据框中的行。


MyRow r1 = new MyRow("f1", "b1", 100);

MyRow r2 = new MyRow("f2", "b2", 200);

List<MyRow> dataList = new ArrayList<>();

dataList.add(r1);

dataList.add(r2);

现在我们必须将此列表转换为数据集 -


Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);

ds.show()


查看完整回答
反对 回复 2023-05-17
?
慕姐4208626

TA贡献1852条经验 获得超7个赞

import org.apache.spark.api.java.function.Function;

private static JavaRDD<Map<String, Object>> rows;

private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));

public void test(){

    rows = sc.parallelize(list);

    JavaRDD<Row> rowRDD = rows.map(f);

    Map<String, Object> headMap = list.get(0);

    TreeMap<String, Object> headerMap = new TreeMap<>(headMap);

    List<StructField> fields = new ArrayList<>();

    StructField field;

    for (String key : headerMap.keySet()) {

        System.out.println("key:::"+key);

        Object value = list.get(0).get(key);

        if (value instanceof Integer) {

            field = DataTypes.createStructField(key, DataTypes.IntegerType, true);

        }

        else if (value instanceof Double) {

            field = DataTypes.createStructField(key, DataTypes.DoubleType, true);

        }

        else if (value instanceof Date || value instanceof java.util.Date) {

            field = DataTypes.createStructField(key, DataTypes.DateType, true);

        }

        else {

            field = DataTypes.createStructField(key, DataTypes.StringType, true);

        }

            fields.add(field);

    }

    StructType struct = DataTypes.createStructType(fields);

    Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);

}


查看完整回答
反对 回复 2023-05-17
  • 4 回答
  • 0 关注
  • 186 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信