为了账号安全,请及时绑定邮箱和手机立即绑定

如何将CSV文件转换为RDD

如何将CSV文件转换为RDD

浮云间 2019-10-28 15:01:25
我是新来的火花。我想对CSV记录中的特定数据执行一些操作。我正在尝试读取CSV文件并将其转换为RDD。我的进一步操作基于CSV文件中提供的标题。(摘自评论)到目前为止,这是我的代码:final JavaRDD<String> File = sc.textFile(Filename).cache();final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {     @Override public Iterable<String> call(String s) {     return Arrays.asList(EOL.split(s));     } });final String heading=lines.first().toString();我可以获得这样的标题值。我想将此映射到CSV文件中的每个记录。final String[] header=heading.split(" "); 我可以获得这样的标题值。我想将此映射到CSV文件中的每个记录。在Java中,我CSVReader record.getColumnValue(Column header)用来获取特定值。我需要做类似这里的事情。
查看完整描述

3 回答

?
哆啦的时光机

TA贡献1779条经验 获得超6个赞

一种简单的方法是拥有一种保留标头的方法。


假设您有一个file.csv,例如:


user, topic, hits

om,  scala, 120

daniel, spark, 80

3754978, spark, 1

我们可以定义一个标头类,该标头类使用第一行的解析版本:


class SimpleCSVHeader(header:Array[String]) extends Serializable {

  val index = header.zipWithIndex.toMap

  def apply(array:Array[String], key:String):String = array(index(key))

}

我们可以使用该标头来处理以后的数据:


val csv = sc.textFile("file.csv")  // original file

val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows

val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line

val rows = data.filter(line => header(line,"user") != "user") // filter the header out

val users = rows.map(row => header(row,"user")

val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)

...

请注意,header仅仅不过是助记符到数组索引的简单映射。几乎所有这些操作都可以在数组中元素的顺序位置上完成,例如user = row(0)


PS:欢迎来到Scala :-)


查看完整回答
反对 回复 2019-10-28
  • 3 回答
  • 0 关注
  • 1244 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信