Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch #11106

Open
vicuna96 opened this issue Apr 26, 2024 · 8 comments
Labels
clazz-conflict conflict of classes spark Issues related to spark

Comments

@vicuna96
Copy link

vicuna96 commented Apr 26, 2024

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

Upgrading Hudi version from 0.13.1 with metadata turned off, to 0.14.1 with metadata turned on.

First run went through fine and created the metadata table.

Second run I am facing the issue shown below.

To Reproduce

Steps to reproduce the behavior:

  1. Bootstrap COW table with Hudi 0.13.1 and metadata turned off.
  2. Run incremental ETL with Hudi 0.14.1 and metadata turned on. This step succeeds. Use org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload. Hudi table version successfully upgraded to 6.
  3. Run subsequent incremental with Hudi 0.14.1 with metadata turned on. This step fails. Use org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload.

Expected behavior

A partial update on the table, that succeeds consistently, not only once.

Environment Description

  • Hudi version : 0.14.1

  • Spark version : 3.3.2

  • Hive version : 3.1.3

  • Hadoop version : 3.3.6

  • Storage (HDFS/S3/GCS..) : GCS

  • Running on Docker? (yes/no) : Dataproc

Additional context

Add any other context about the problem here.
Hudi configurations as follows

hoodie.parquet.small.file.limit -> 4194304
hoodie.bloom.index.parallelism -> 256
hoodie.parquet.max.file.size -> 33554432
hoodie.partition.metafile.use.base.format -> true
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> true
hoodie.parquet.block.size -> 16777216
hoodie.metadata.enable -> true
hoodie.datasource.write.drop.partition.columns -> true
hoodie.keep.max.commits -> 91
hoodie.upsert.shuffle.parallelism -> 256
hoodie.cleaner.commits.retained -> 77
hoodie.keep.min.commits -> 84
hoodie.global.bloom.index.parallelism -> 25
hoodie.datasource.write.precombine.field -> HUDI_PRECOMBINE_TS
hoodie.datasource.write.operation -> upsert
hoodie.datasource.write.recordkey.field -> ORDER_NUM,APPLN_VER_CD
hoodie.table.name -> OrderTable
hoodie.datasource.write.hive_style_partitioning -> true
hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.partitionpath.field -> ORDER_CREATE_UTC_DT,APPLN_VER_C

Stacktrace

Add the stacktrace of the error.

The error stack trace.

24/04/26 16:21:51 ERROR TableLoaderClass: 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:70)
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	-- Application specific stack trace
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:739)

And prior warning found

24/04/26 16:21:49 WARN TaskSetManager: Lost task 186.0 in stage 1.0 (TID 226) (gif-publish-incremental-sfo-sw-swwj.c.wmt-bfdms-opddev.internal executor 45): java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch (org.apache.spark.sql.catalyst.expressions.UnsafeRow and org.apache.spark.sql.vectorized.ColumnarBatch are in unnamed module of loader 'app')
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:579)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:568)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:758)
	at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:58)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
@danny0405
Copy link
Contributor

It looks like a known issue reported in: #9305

@vicuna96
Copy link
Author

Hi @danny0405, yes it does look similar. However, the table was already running with Spark 3.3.2 and hudi 0.13.1 without errors. The only changes here were we upgraded the Hudi version to 0.14.1 and turned on metadata.

The casting also seems to be in the opposite direction and the first run with Hudi 0.14.1 did not have metadata and succeeded. Do you think the issue is related to how the metadata table is saved? In other words, is metadata not supported with Spark 3.3.2?

Thanks for the help!

@danny0405
Copy link
Contributor

Do you think the issue is related to how the metadata table is saved? In other words, is metadata not supported with Spark 3.3.2?

It is supported, can you share you config options related with metadata table?

@vicuna96
Copy link
Author

Hi @danny0405 , we are using defaults only. All hudi configs specified are listed above. Is there something we should configure specifically?

@danny0405
Copy link
Contributor

I'm pretty sure it is a jar conflict, can you check the jar that involves the reported class?

@ad1happy2go
Copy link
Contributor

@vicuna96 How many columns are there in your dataset? If its more than 100, did you tried setting spark.sql.codegen.maxFields

@vicuna96
Copy link
Author

vicuna96 commented May 2, 2024

Hi @danny0405 , this seems to be in the spark-catalyst_2.12-3.3.2.jar package. but org.apache.spark.sql.catalyst.expressions.UnsafeRow does not extend org.apache.spark.sql.vectorized.ColumnarBatch. Is this expected in different versions?

Hi @ad1happy2go , I can give it a try but the table should have less than 100 columns and also this seems like a spark property rather than hudi property and the spark version has not changed. I will update once I get a chance to test it.

@codope codope added spark Issues related to spark clazz-conflict conflict of classes labels May 2, 2024
@ad1happy2go
Copy link
Contributor

@vicuna96 Did you get a chance to test out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clazz-conflict conflict of classes spark Issues related to spark
Projects
Status: 👤 User Action
Development

No branches or pull requests

4 participants