为了账号安全,请及时绑定邮箱和手机立即绑定

Pyspark 无法从 pathlib 对象加载

Pyspark 无法从 pathlib 对象加载

青春有我 2023-12-12 21:31:54
Python Version 3.7.5Spark Version 3.0Databricks Runtime 7.3我目前正在使用数据湖文件系统中的路径。这是p = dbutils.fs.ls('dbfs:/databricks-datasets/nyctaxi')print(p) [FileInfo(path='dbfs:/databricks-datasets/nyctaxi/readme_nyctaxi.txt', name='readme_nyctaxi.txt', size=916), FileInfo(path='dbfs:/databricks-datasets/nyctaxi/reference/', name='reference/', size=0), FileInfo(path='dbfs:/databricks-datasets/nyctaxi/taxizone/', name='taxizone/', size=0), FileInfo(path='dbfs:/databricks-datasets/nyctaxi/tripdata/', name='tripdata/', size=0)]现在,为了将其转换为有效的 Pathlib Posix 对象,我通过函数传递它def create_valid_path(paths):    return Path('/dbfs').joinpath(*[part for part in Path(paths).parts[1:]])的输出tripdata是PosixPath('/dbfs/databricks-datasets/nyctaxi/tripdata')现在,如果我想在将 csv 的子集收集到列表中后将其读入 Sparkdata 框架。from pyspark.sql.functions import * df = spark.read.format('csv').load(paths)这返回AttributeError: 'PosixPath' object has no attribute '_get_object_id'现在,我可以让它工作的唯一方法是手动添加路径dbfs:/..并将每个项目返回到字符串,但是有必要使用 Pathlib 来执行一些基本的 I/O 操作。我是否遗漏了一些简单的东西,或者 Pyspark 根本无法读取 pathlib 对象?例如trip_paths_str = [str(Path('dbfs:').joinpath(*part.parts[2:])) for part in trip_paths]print(trip_paths_str)['dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-01.csv.gz', 'dbfs:/databricks-datasets/nyctaxi/tripdata/fhv/fhv_tripdata_2015-02.csv.gz'...]
查看完整描述

1 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

那么这样做怎么样?


from pyspark.sql.functions import * 

import os


def db_list_files(file_path):

  file_list = [file.path for file in dbutils.fs.ls(file_path) if os.path.basename(file.path)]

  return file_list


files = db_list_files('dbfs:/FileStore/tables/')

 

df = spark.read.format('text').load(files)

df.show()


查看完整回答
反对 回复 2023-12-12
  • 1 回答
  • 0 关注
  • 65 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信