为了账号安全,请及时绑定邮箱和手机立即绑定

PySpark SQL 覆盖返回空表

PySpark SQL 覆盖返回空表

慕森王 2023-03-30 09:34:07
我正在迁移表中的一些数据,我正在尝试更改“日期”列的值,但 PySpark 似乎在读取数据时删除了数据。我正在执行以下步骤:从表中读取数据更改列的值将数据覆盖到同一张表当我在这些步骤之后检查数据时,我的表是空的。这是我的代码table = "MY_TABLE" data_input = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable=table).load()print("data_input.count()=", data_input.count())print("'2019' in data_input:", data_input.where(col("date").contains("2019")).count())print("'YEAR' in data_input:", data_input.where(col("date").contains("YEAR")).count())# data_input.count()= 1000# '2019' in data_input: 1000# 'YEAR' in data_input: 0data_output = data_input.withColumn("date", F.regexp_replace("date", "2019", "YEAR"))print("data_output.count()=", data_output.count())print("'2019' in data_output:", data_output.where(col("date").contains("2019")).count())print("'YEAR' in data_output:", data_output.where(col("date").contains("YEAR")).count())# data_output.count()= 1000# '2019' in data_output: 1000# 'YEAR' in data_output: 0到目前为止一切顺利,让我们覆盖表格df_writer = DataFrameWriter(data_output)df_writer.jdbc(url = JDBCURL, table=table, mode="overwrite")# Let's check the data nowprint("data_input.count()=", data_input.count())print("'2019' in data_input:", data_input.where(col("date").contains("2019")).count())print("'YEAR' in data_input:", data_input.where(col("date").contains("YEAR")).count())# data_input.count()= 0# '2019' in data_input: 0# 'YEAR' in data_input: 0# huh, weirdprint("data_output.count()=", data_output.count())print("'2019' in data_output:", data_output.where(col("date").contains("2019")).count())print("'YEAR' in data_output:", data_output.where(col("date").contains("YEAR")).count())# data_output.count()= 0# '2019' in data_output: 0# 'YEAR' in data_output: 0# Still weird查询SELECT * FROM MY_TABLE返回 0 行。为什么 [Py]Spark 这样做?我怎样才能改变这种行为?缓存?这在文档中有什么解释?
查看完整描述

2 回答

?
红糖糍粑

TA贡献1815条经验 获得超6个赞

我只是遇到了同样的问题并.cache()在阅读表格后添加为我修复了它,正如那里所解释的那样:

data_input = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable=table).cache()


data_output = [ do something with data_input ]


data_output.write.jdbc(url = JDBCURL, table=table, mode="overwrite")


查看完整回答
反对 回复 2023-03-30
?
慕尼黑5688855

TA贡献1848条经验 获得超2个赞

我通过“缓存”数据框找到了解决方法:


data_pandas = data_output.toPandas()

data_spark = spark.createDataFrame(data_pandas)

data_spark.write.jdbc(url=JDBCURL, table=table, mode="overwrite")


查看完整回答
反对 回复 2023-03-30
  • 2 回答
  • 0 关注
  • 76 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信