[ARUON #1994] Support HiveTableScanExec to native#1995
[ARUON #1994] Support HiveTableScanExec to native#1995guixiaowen wants to merge 2 commits intoapache:masterfrom
Conversation
| override def isEnabled: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) | ||
|
|
||
| def enableHiveTableScanExec: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) |
There was a problem hiding this comment.
Should we be adding these to @SparkAuronConfiguration.java?
| override def isSupported(exec: SparkPlan): Boolean = | ||
| exec match { | ||
| case e: HiveTableScanExec if enableHiveTableScanExec && | ||
| e.relation.tableMeta.provider.isDefined && | ||
| e.relation.tableMeta.provider.get.equals("hive") => | ||
| true | ||
| case _ => false | ||
| } |
There was a problem hiding this comment.
Slightly hard to distinguish when to use isSupported v/s enableHiveTableScanExec flag. Do you mind adding documentation here?
| .newBuilder() | ||
| .setBaseConf(nativeFileScanConf) | ||
| .setFsResourceId(resourceId) | ||
| .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter |
There was a problem hiding this comment.
So we will do a full table scan here? If that's a case, do you mind creating an issue and linking it here?
|
Are there tests we can add to verify conversion behavior here? |
There was a problem hiding this comment.
Pull request overview
This PR aims to add native execution support for Spark HiveTableScanExec by introducing a Hive-specific conversion provider and a new native scan exec implementation, plus updating build dependencies to compile against Spark Hive classes.
Changes:
- Add
HiveConvertProviderand a newNativeHiveTableScanExecimplementation intended to convertHiveTableScanExecto native ORC/Parquet scans. - Update
spark-extension-shims-sparkMaven dependencies to include Spark Hive/Catalyst APIs needed by the new shims code. - Refactor/import changes in
AuronConverters, and add a (currently unimplemented)convertHiveTableScanExecstub.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 19 comments.
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala | Import refactor; adds an unimplemented convertHiveTableScanExec method; currently introduces compile issues (missing import + stub). |
| spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala | New native Hive table scan exec; currently contains multiple compile/runtime issues (API mismatches, pattern matches, session usage, unsupported formats). |
| spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala | New ServiceLoader-based conversion provider for Hive scans; currently missing ServiceLoader registration and insufficient support checks. |
| spark-extension-shims-spark/pom.xml | Adds Spark Hive/Catalyst dependencies, but introduces duplicate spark-hive dependency entries with conflicting scopes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| override def isSupported(exec: SparkPlan): Boolean = | ||
| exec match { | ||
| case e: HiveTableScanExec if enableHiveTableScanExec && | ||
| e.relation.tableMeta.provider.isDefined && | ||
| e.relation.tableMeta.provider.get.equals("hive") => | ||
| true |
There was a problem hiding this comment.
isSupported accepts all Hive tables with provider == "hive", but NativeHiveTableScanExec only builds native nodes for ORC/Parquet and otherwise will throw (e.g., MatchError on file format). Add an explicit format check here (or make the native exec gracefully fall back) to avoid runtime failures on non-ORC/Parquet Hive tables.
| nativeTable.getMetadata) | ||
|
|
||
| @transient private lazy val nativeHadoopConf = { | ||
| val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() |
There was a problem hiding this comment.
nativeHadoopConf uses SparkSession.getActiveSession.get, which can throw if there is no active session (e.g., execution triggered outside a SQL context). Consider using the same session derivation as NativeHiveTableScanBase.broadcastedHadoopConf (Shims.get.getSqlContext(basedHiveScan).sparkSession) to avoid runtime failures.
| val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() | |
| val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession | |
| val hiveConf = sparkSession.sessionState.newHadoopConf() |
| private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { | ||
| 0 // will splitted based on block by default. | ||
| } else { | ||
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | ||
| SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) |
There was a problem hiding this comment.
minPartitions reads SparkSession.getActiveSession.get.sparkContext multiple times. Besides the .get risk, it’s also inconsistent with other native scan implementations in this repo that pass an explicit sparkSession around. Prefer using a single sparkSession resolved from basedHiveScan and derive sparkContext from it.
| private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { | ||
| 0 // will splitted based on block by default. | ||
| } else { | ||
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | ||
| SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) | ||
| } | ||
|
|
||
| private val ignoreEmptySplits = | ||
| SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) |
There was a problem hiding this comment.
ignoreEmptySplits also depends on SparkSession.getActiveSession.get. This should use the same non-optional session/context resolution as the rest of the execution code to avoid NoSuchElementException when there is no active session.
| private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { | |
| 0 // will splitted based on block by default. | |
| } else { | |
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | |
| SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) | |
| } | |
| private val ignoreEmptySplits = | |
| SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) | |
| private val minPartitions = if (sparkContext.isLocal) { | |
| 0 // will splitted based on block by default. | |
| } else { | |
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | |
| sparkContext.defaultMinPartitions) | |
| } | |
| private val ignoreEmptySplits = | |
| sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) |
| private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]): | ||
| InputFormat[Writable, Writable] = { | ||
| val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) | ||
| .asInstanceOf[InputFormat[Writable, Writable]] | ||
| newInputFormat match { |
There was a problem hiding this comment.
getInputFormat takes a mapreduce.InputFormat class (newInputClass) but returns/instantiates org.apache.hadoop.mapred.InputFormat. This signature mismatch makes the unchecked cast even riskier. Align the parameter type with the returned InputFormat type (or vice versa) so the compiler can help enforce correctness.
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | ||
| arrayFilePartition.toArray | ||
| } | ||
|
|
||
| private def getMaxSplitBytes(sparkSession: SparkSession): Long = { | ||
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | ||
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | ||
| Math.min(defaultMaxSplitBytes, openCostInBytes) |
There was a problem hiding this comment.
getMaxSplitBytes currently returns min(filesMaxPartitionBytes, filesOpenCostInBytes), which can drastically shrink splits and create excessive partitions. Elsewhere in this repo (NativePaimonTableScanExec) you fork Spark’s FilePartition#maxSplitBytes logic using min(defaultMaxSplitBytes, max(openCostInBytes, bytesPerCore)). Align this implementation to that logic (or call the shared helper) to avoid performance regressions.
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | |
| arrayFilePartition.toArray | |
| } | |
| private def getMaxSplitBytes(sparkSession: SparkSession): Long = { | |
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | |
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | |
| Math.min(defaultMaxSplitBytes, openCostInBytes) | |
| getMaxSplitBytes(SparkSession.getActiveSession.get, partitionedFiles)).toArray | |
| arrayFilePartition.toArray | |
| } | |
| private def getMaxSplitBytes( | |
| sparkSession: SparkSession, | |
| partitionedFiles: Seq[PartitionedFile]): Long = { | |
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | |
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | |
| val totalBytes = partitionedFiles.map(_.length).sum | |
| val parallelism = math.max(1, sparkSession.sparkContext.defaultParallelism) | |
| val bytesPerCore = if (totalBytes <= 0L) openCostInBytes else totalBytes / parallelism | |
| Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) |
| class HiveConvertProvider extends AuronConvertProvider with Logging { | ||
| override def isEnabled: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) | ||
|
|
||
| def enableHiveTableScanExec: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) | ||
|
|
There was a problem hiding this comment.
HiveConvertProvider is discovered via ServiceLoader (AuronConverters loads AuronConvertProvider implementations). This module currently doesn’t include a META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider entry, so the provider won’t be loaded at runtime (see thirdparty/auron-paimon for the existing pattern).
| def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = { | ||
| AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec)) | ||
| } |
There was a problem hiding this comment.
There are existing query/operator validation test utilities in this module, but this new Hive scan conversion path doesn’t appear to have coverage. Add at least one suite that enables spark.auron.enable.hiveTableScanExec and verifies HiveTableScanExec is converted (and that unsupported formats don’t break execution).
| arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, | ||
| partitionedFiles, | ||
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray |
There was a problem hiding this comment.
FilePartition.getFilePartitions(SparkSession.getActiveSession.get, ...) again relies on getActiveSession.get. Use the sparkSession derived from basedHiveScan (as in NativeHiveTableScanBase) so partition planning doesn’t fail when there’s no active session.
| arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, | |
| partitionedFiles, | |
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | |
| val sparkSession = basedHiveScan.sparkSession | |
| arrayFilePartition += FilePartition.getFilePartitions( | |
| sparkSession, | |
| partitionedFiles, | |
| getMaxSplitBytes(sparkSession) | |
| ).toArray |
| inputFormatClass match { | ||
| case OrcInputFormat => | ||
| case MapredParquetInputFormat => | ||
| case _ => | ||
| } |
There was a problem hiding this comment.
inputFormatClass match { case OrcInputFormat => ... } is matching a Class[_] value against a class name, and the cases are empty. If you need special handling by input format, compare against classOf[OrcInputFormat] / classOf[MapredParquetInputFormat] and implement the intended behavior; otherwise remove this dead code block.
| inputFormatClass match { | |
| case OrcInputFormat => | |
| case MapredParquetInputFormat => | |
| case _ => | |
| } |
Which issue does this PR close?
Closes #1994
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?