Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7778] Fixing global index for duplicate updates #11256

Merged
merged 1 commit into from
May 25, 2024

Conversation

nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented May 18, 2024

Change Logs

We occasionally this duplicate keys being ingested to RLI partition in MDT. Fixing the root cause in this patch.

Root cause:
After fetching record locations from RLI partition in MDT, before doing snapshot read to honor payload merge and ordering field, we fetch unique Partition and fileId pairs. Instead of fetching unique pair of {Partition, fileId}s, we were using HoodieRecordGlobalLocation which also contains "instantTime" in addition to partition path and fileId. So, this was resulting in 1 record from incoming resulting in 2 to 3 or N records after joining because of this.

I have written tests to reproduce the issue. If not for the fix, we will encounter below exception


Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while appending records to file:/var/folders/ym/8yjkm3n90kq8tk4gfmvk7y140000gn/T/junit2792173348364470678/.hoodie/metadata/record_index/.record-index-0009-0_00000000000000011.log.3_3-275-476
	at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:475)
	at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:439)
	at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:90)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:355)
	... 28 more
Caused by: org.apache.hudi.exception.HoodieException: Writing multiple records with same key 1 not supported for org.apache.hudi.common.table.log.block.HoodieHFileDataBlock
	at org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:146)
	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:121)
	at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:166)
	at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:467)
	... 31 more

Driver stacktrace:
51301 [main] INFO  org.apache.spark.scheduler.DAGScheduler [] - Job 78 failed: collect at HoodieJavaRDD.java:177, took 0.245313 s
51303 [main] INFO  org.apache.hudi.client.BaseHoodieClient [] - Stopping Timeline service !!
51303 [main] INFO  org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closing Timeline server
51303 [main] INFO  org.apache.hudi.timeline.service.TimelineService [] - Closing Timeline Service
51321 [main] INFO  org.apache.hudi.timeline.service.TimelineService [] - Closed Timeline Service
51321 [main] INFO  org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closed Timeline server

org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 19700101000020000

	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:80)
	at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:47)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:156)
	at org.apache.hudi.functional.TestGlobalIndexEnableUpdatePartitions.testUdpateSubsetOfRecUpdates(TestGlobalIndexEnableUpdatePartitions.java:225)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:212)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:192)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:139)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:107)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:373)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:419)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:107)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:42)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.apache.hudi.exception.HoodieException: Failed to update metadata
	at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:72)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.commit(BaseSparkCommitActionExecutor.java:326)
	at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:206)
	at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:188)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:294)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:196)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:87)
	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:74)
	... 129 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 275.0 failed 1 times, most recent failure: Lost task 3.0 in stage 275.0 (TID 476) (10.0.0.202 executor driver): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :3
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:362)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:272)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1508)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1418)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1482)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1305)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while appending records to file:/var/folders/ym/8yjkm3n90kq8tk4gfmvk7y140000gn/T/junit2792173348364470678/.hoodie/metadata/record_index/.record-index-0009-0_00000000000000011.log.3_3-275-476
	at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:475)
	at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:439)
	at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:90)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:355)
	... 28 more
Caused by: org.apache.hudi.exception.HoodieException: Writing multiple records with same key 1 not supported for org.apache.hudi.common.table.log.block.HoodieHFileDataBlock
	at org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:146)
	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:121)
	at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:166)
	at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:467)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
	at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
	at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.setCommitMetadata(BaseSparkCommitActionExecutor.java:304)
	at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:202)
	at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:188)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:294)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:196)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:161)
	at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
	at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:170)
	at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:66)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1101)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:117)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:813)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:868)
	at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:67)
	... 136 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :3
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:362)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:272)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1508)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1418)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1482)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1305)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while appending records to file:/var/folders/ym/8yjkm3n90kq8tk4gfmvk7y140000gn/T/junit2792173348364470678/.hoodie/metadata/record_index/.record-index-0009-0_00000000000000011.log.3_3-275-476
	at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:475)
	at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:439)
	at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:90)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:355)
	... 28 more
Caused by: org.apache.hudi.exception.HoodieException: Writing multiple records with same key 1 not supported for org.apache.hudi.common.table.log.block.HoodieHFileDataBlock
	at org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:146)
	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:121)
	at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:166)
	at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:467)
	... 31 more

Impact

Global index (bloom, simple, RLI) will not result in duplicates.

Risk level (write none, low medium or high below)

low.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label May 18, 2024
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

p.getRight().get().getPartitionPath(),
p.getRight().get().getInstantTime(),
p.getRight().get().getFileId()))
.map(p -> Pair.of(p.getRight().get().getPartitionPath(), p.getRight().get().getFileId()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but can I ask a question. look like the instantime is different with the same fgId in HoodieRecordGlobalLocation, is this normal? In Global Simple Index, it will use base file commit time as InstantTime, and there is no different timestamp in the same file. So there is a problem here, HoodieRecordLocation's instantTime is baseFile lastCommit time which file name contain, or every record commit time? or may be use any of them, there will be no problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @KnightChess : atleast for global simple, global bloom and RLI, instant time should not matter. Hbase might have some intricacies with instant time to deduce rollbacks. I did not want to increase the scope of this patch. We are good wrt global simple, global bloom and RLI w/ cur fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan thanks for answering

p.getRight().get().getPartitionPath(),
p.getRight().get().getInstantTime(),
p.getRight().get().getFileId()))
.map(p -> Pair.of(p.getRight().get().getPartitionPath(), p.getRight().get().getFileId()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can simply remove instantTime from the equals and hashcode of HoodieRecordGlobalLocation. Why would we need instantTime for global location? Clustering will anyway map to a new fileId. Compaction will create a base file with same file id but different instant time. However, for the location all we care about is partition path and filegroup id right. Instant time is only used for filesystem view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a legacy design, to give a quick access of the instant time from a given Hoodie record, but I agree with you, the location should not include the instant time. The HoodieRecordLocation should also be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instant time is used in other places as well (and not just global indexing). I do not want to include that in this patch. For now, lets proceed w/ 0.15.0 with cur fix and work towards properly fixing the RecordLocation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah let's track separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope codope merged commit ddf63b5 into apache:master May 25, 2024
46 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-0.15.0 size:S PR with lines of changes in (10, 100]
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

None yet

6 participants