为了账号安全,请及时绑定邮箱和手机立即绑定

使用 java 将索引列添加到 apache spark Dataset<Row>

使用 java 将索引列添加到 apache spark Dataset<Row>

手掌心 2022-11-30 13:50:10
下面的问题有 scala 和 pyspark 的解决方案,这个问题中提供的解决方案不适用于连续的索引值。Spark Dataframe:如何添加索引列:又名分布式数据索引我在 Apache-spark 中有一个现有数据集,我想根据索引从中选择一些行。我打算添加一个索引列,其中包含从 1 开始的唯一值,并根据该列的值获取行。我发现以下方法可以添加使用排序依据的索引:df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));我不想使用排序依据。我需要索引的顺序与它们在数据集中的顺序相同。有什么帮助吗?
查看完整描述

2 回答

?
阿晨1998

TA贡献2037条经验 获得超6个赞

据我所知,您正在尝试将索引(具有连续值)添加到数据框。不幸的是,在 Spark 中没有内置函数可以做到这一点。您只能使用 df.withColumn("index", ) 添加递增索引(但不一定具有连续值monotonicallyIncreasingId)。


尽管如此,RDD API 中有一个zipWithIndex函数可以完全满足您的需要。因此,我们可以定义一个函数,将数据帧转换为 RDD,添加索引并将其转换回数据帧。


我不是 java 中 spark 的专家(scala 更紧凑)所以可能会做得更好。这是我会怎么做。


public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {

    JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {

        Row r = t._1;

        Long index = t._2 + 1;

        ArrayList<Object> list = new ArrayList<>();

        r.toSeq().iterator().foreach(x -> list.add(x));

        list.add(index);

        return RowFactory.create(list);

    });

    StructType newSchema = df.schema()

            .add(new StructField(name, DataTypes.LongType, true, null));

    return df.sparkSession().createDataFrame(rdd, newSchema);

}

以下是您将如何使用它。请注意内置 spark 函数的作用与我们的方法的作用形成对比。


Dataset<Row> df = spark.range(5)

    .withColumn("index1", functions.monotonicallyIncreasingId());

Dataset<Row> result = zipWithIndex(df, "good_index");

// df

+---+-----------+

| id|     index1|

+---+-----------+

|  0|          0|

|  1| 8589934592|

|  2|17179869184|

|  3|25769803776|

|  4|25769803777|

+---+-----------+


// result

+---+-----------+----------+

| id|     index1|good_index|

+---+-----------+----------+

|  0|          0|         1|

|  1| 8589934592|         2|

|  2|17179869184|         3|

|  3|25769803776|         4|

|  4|25769803777|         5|

+---+-----------+----------+


查看完整回答
反对 回复 2022-11-30
?
UYOU

TA贡献1878条经验 获得超4个赞

上面的答案经过一些调整对我有用。下面是一个功能性的 Intellij Scratch 文件。我在 Spark 2.3.0 上:


import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.functions;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;


import java.util.ArrayList;


class Scratch {

    public static void main(String[] args) {

        SparkSession spark = SparkSession

                    .builder()

                    .appName("_LOCAL")

                    .master("local")

                    .getOrCreate();

        Dataset<Row> df = spark.range(5)

                .withColumn("index1", functions.monotonicallyIncreasingId());

        Dataset<Row> result = zipWithIndex(df, "good_index");

        result.show();

    }

    public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {

        JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {

            Row r = t._1;

            Long index = t._2 + 1;

            ArrayList<Object> list = new ArrayList<>();

            scala.collection.Iterator<Object> iterator = r.toSeq().iterator();

            while(iterator.hasNext()) {

                Object value = iterator.next();

                assert value != null;

                list.add(value);

            }

            list.add(index);

            return RowFactory.create(list.toArray());

        });

        StructType newSchema = df.schema()

                .add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));

        return df.sparkSession().createDataFrame(rdd, newSchema);

    }

}

输出:


+---+------+----------+

| id|index1|good_index|

+---+------+----------+

|  0|     0|         1|

|  1|     1|         2|

|  2|     2|         3|

|  3|     3|         4|

|  4|     4|         5|

+---+------+----------+


查看完整回答
反对 回复 2022-11-30
  • 2 回答
  • 0 关注
  • 169 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号