2 回答

TA贡献1828条经验 获得超3个赞
拼花工具将无法将格式类型从 INT96 更改为 INT64。您在 json 输出中观察到的是存储在 INT96 时间戳类型中的时间戳的字符串表示形式。您需要火花在INT64 TimestampType中使用时间戳重写此镶木地板,然后json输出将生成时间戳(以您想要的格式)。
您需要在 Spark 中设置特定的配置 -
spark-shell --conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MICROS
2020-03-16 11:37:50 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.20:4040
Spark context available as 'sc' (master = local[*], app id = local-1584383875924).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
val sourceDf = spark.read.parquet("original-file.snappy.parquet")
2020-03-16 11:38:31 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
sourceDf: org.apache.spark.sql.DataFrame = [application: struct<name: string, upgrades: struct<value: double> ... 3 more fields>, timestamp: timestamp ... 16 more fields]
scala> sourceDf.repartition(1).write.parquet("Downloads/output")
拼花工具将显示正确的时间戳类型
parquet-tools schema Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet
message spark_schema {
...
optional binary _id (UTF8);
optional int64 timestamp (TIMESTAMP_MICROS);
...
}
而 json 转储给出了 -
parquet-tools cat --json Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet
{..."_id":"101836", "timestamp":1583973827000000}
记录的时间戳以纳秒为单位。希望这有帮助!

TA贡献1824条经验 获得超6个赞
Doug,这个来自 arrow/cpp/src/parquet/types.h 的代码显示了 Int96 时间戳是如何在内部存储的:
constexpr int64_t kJulianToUnixEpochDays = INT64_C(2440588);
constexpr int64_t kSecondsPerDay = INT64_C(60 * 60 * 24);
constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000);
constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000);
constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000);
MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
STRUCT_END(Int96, 12);
static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) {
std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds));
}
static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) {
// We do the computations in the unsigned domain to avoid unsigned behaviour
// on overflow.
uint64_t days_since_epoch =
i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
uint64_t nanoseconds = 0;
memcpy(&nanoseconds, &i96.value, sizeof(uint64_t));
return static_cast<int64_t>(days_since_epoch * kNanosecondsPerDay + nanoseconds);
}
添加回答
举报