Skip to content
Snippets Groups Projects
Commit e14817b5 authored by Wenchen Fan's avatar Wenchen Fan Committed by Yin Huai
Browse files

[SPARK-12870][SQL] better format bucket id in file name

for normal parquet file without bucket, it's file name ends with a jobUUID which maybe all numbers and mistakeny regarded as bucket id. This PR improves the format of bucket id in file name by using a different seperator, `_`, so that the regex is more robust.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #10799 from cloud-fan/fix-bucket.
parent 0ddba6d8
No related branches found
No related tags found
No related merge requests found
......@@ -57,15 +57,21 @@ private[sql] abstract class BucketedOutputWriterFactory extends OutputWriterFact
private[sql] object BucketingUtils {
// The file name of bucketed data should have 3 parts:
// 1. some other information in the head of file name, ends with `-`
// 2. bucket id part, some numbers
// 1. some other information in the head of file name
// 2. bucket id part, some numbers, starts with "_"
// * The other-information part may use `-` as separator and may have numbers at the end,
// e.g. a normal parquet file without bucketing may have name:
// part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and we will mistakenly
// treat `431234567891` as bucket id. So here we pick `_` as separator.
// 3. optional file extension part, in the tail of file name, starts with `.`
// An example of bucketed parquet file name with bucket id 3:
// part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-00003.gz.parquet
private val bucketedFileName = """.*-(\d+)(?:\..*)?$""".r
// part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
def getBucketId(fileName: String): Option[Int] = fileName match {
case bucketedFileName(bucketId) => Some(bucketId.toInt)
case other => None
}
def bucketIdToString(id: Int): String = f"_$id%05d"
}
......@@ -193,7 +193,7 @@ private[json] class JsonOutputWriter(
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
}
}.getRecordWriter(context)
......
......@@ -90,7 +90,7 @@ private[sql] class ParquetOutputWriter(
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
}
}
......
......@@ -103,7 +103,7 @@ private[orc] class OrcOutputWriter(
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString.orc"
new OrcOutputFormat().getRecordWriter(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment