Skip to content

Commit

Permalink
[cdc] Fix Multibyte Character Encoding in MultiTableCommittable Seria…
Browse files Browse the repository at this point in the history
…lization

This closes #3028
  • Loading branch information
Pandas886 authored and JingsongLi committed May 4, 2024
1 parent 183c81d commit 075d774
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* {@link SimpleVersionedSerializer} for {@link MultiTableCommittable}. If a type info class is
Expand Down Expand Up @@ -53,9 +54,9 @@ public int getVersion() {
public byte[] serialize(MultiTableCommittable committable) throws IOException {
// first serialize all metadata
String database = committable.getDatabase();
int databaseLen = database.length();
int databaseLen = database.getBytes(StandardCharsets.UTF_8).length;
String table = committable.getTable();
int tableLen = table.length();
int tableLen = table.getBytes(StandardCharsets.UTF_8).length;

int multiTableMetaLen = databaseLen + tableLen + 2 * 4;

Expand Down Expand Up @@ -83,17 +84,18 @@ public MultiTableCommittable deserialize(int committableVersion, byte[] bytes)
int databaseLen = buffer.getInt();
byte[] databaseBytes = new byte[databaseLen];
buffer.get(databaseBytes, 0, databaseLen);
String database = new String(databaseBytes);
String database = new String(databaseBytes, StandardCharsets.UTF_8);

int tableLen = buffer.getInt();
byte[] tableBytes = new byte[tableLen];
buffer.get(tableBytes, 0, tableLen);
String table = new String(tableBytes);
int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
String table = new String(tableBytes, StandardCharsets.UTF_8);
int multiTableMetaLen = 4 + databaseLen + 4 + tableLen;

// use committable serializer (of the same version) to deserialize committable
byte[] serializedCommittable = new byte[bytes.length - multiTableMetaLen];

buffer.get(serializedCommittable, 0, bytes.length - multiTableMetaLen);
buffer.get(serializedCommittable, 0, serializedCommittable.length);
Committable committable = deserializeCommittable(committableVersion, serializedCommittable);

return MultiTableCommittable.fromCommittable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.CommitMessageSerializer;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.function.Consumer;

import static org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
import static org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;

class MultiTableCommittableSerializerTest {
private final CommitMessageSerializer fileSerializer = new CommitMessageSerializer();
Expand All @@ -41,23 +46,76 @@ class MultiTableCommittableSerializerTest {
new MultiTableCommittableSerializer(fileSerializer);

@Test
public void testFileMetadata() throws IOException {
public void testDeserialize() throws IOException {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(row(0), 1, dataIncrement, compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE, commitMessage);
String database = "database";
String table = "table";
MultiTableCommittable multiTableCommittable =
MultiTableCommittable.fromCommittable(
Identifier.create(database, table), committable);
MultiTableCommittable deserializeCommittable =
serializer.deserialize(2, serializer.serialize(multiTableCommittable));

assertThat(deserializeCommittable).isInstanceOf(MultiTableCommittable.class);

assertThat(deserializeCommittable.getDatabase()).isEqualTo(database);
assertThat(deserializeCommittable.getTable()).isEqualTo(table);

Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", "table")).stream()
.forEach(
new Consumer<Tuple2<String, String>>() {
@Override
public void accept(Tuple2<String, String> stringStringTuple2) {
String database = stringStringTuple2.f0;
String table = stringStringTuple2.f1;
MultiTableCommittable multiTableCommittable =
MultiTableCommittable.fromCommittable(
Identifier.create(database, table), committable);
MultiTableCommittable deserializeCommittable = null;
try {
deserializeCommittable =
serializer.deserialize(
2, serializer.serialize(multiTableCommittable));
} catch (IOException e) {
throw new RuntimeException(e);
}

assertThat(deserializeCommittable)
.isInstanceOf(MultiTableCommittable.class);

assertThat(deserializeCommittable.getDatabase())
.isEqualTo(database);
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
}
});
}

@Test
public void testSerialize() throws IOException {
DataIncrement newFilesIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(row(0), 1, newFilesIncrement, compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE, commitMessage);

Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", "table")).stream()
.forEach(
new Consumer<Tuple2<String, String>>() {
@Override
public void accept(Tuple2<String, String> stringStringTuple2) {
String database = stringStringTuple2.f0;
String table = stringStringTuple2.f1;

MultiTableCommittable multiTableCommittable =
MultiTableCommittable.fromCommittable(
Identifier.create(database, table), committable);

byte[] serializedData = null;
try {
serializedData = serializer.serialize(multiTableCommittable);
} catch (BufferOverflowException e) {
e.printStackTrace();
assert false : "Should not throw BufferOverflowException";
} catch (IOException e) {
e.printStackTrace();
assert false : "IOException occurred";
}

assertNotNull(
"The serialized data should not be null.", serializedData);
}
});
}
}

0 comments on commit 075d774

Please sign in to comment.