Skip to content

Commit

Permalink
code style
Browse files Browse the repository at this point in the history
  • Loading branch information
ClownXC committed May 12, 2024
1 parent ce2dba1 commit 2c327bf
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."),
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-27", "'<identifier>' unsupported data type '<dataType>'"),
UNSUPPORTED_ARRAY_GENERIC_TYPE(
"COMMON-28", "'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
"COMMON-28",
"'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
UNSUPPORTED_ROW_KIND(
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private RowConverter() {}
* @param dataType Data type of the array
* @return SeaTunnel array object
*/
public static Object convert(String fieldName, InternalArray array, SeaTunnelDataType<?> dataType) {
public static Object convert(
String fieldName, InternalArray array, SeaTunnelDataType<?> dataType) {
switch (dataType.getSqlType()) {
case STRING:
String[] strings = new String[array.size()];
Expand Down Expand Up @@ -122,7 +123,9 @@ public static Object convert(String fieldName, InternalArray array, SeaTunnelDat
return doubles;
default:
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
}

Expand All @@ -133,7 +136,8 @@ public static Object convert(String fieldName, InternalArray array, SeaTunnelDat
* @param dataType SeaTunnel array data type
* @return Paimon array object {@link BinaryArray}
*/
public static BinaryArray reconvert(String fieldName, Object array, SeaTunnelDataType<?> dataType) {
public static BinaryArray reconvert(
String fieldName, Object array, SeaTunnelDataType<?> dataType) {
int length = ((Object[]) array).length;
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter;
Expand Down Expand Up @@ -221,7 +225,9 @@ public static BinaryArray reconvert(String fieldName, Object array, SeaTunnelDat
break;
default:
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
binaryArrayWriter.complete();
return binaryArray;
Expand Down Expand Up @@ -293,7 +299,11 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
case ARRAY:
SeaTunnelDataType<?> arrayType = seaTunnelRowType.getFieldType(i);
InternalArray array = rowData.getArray(i);
objects[i] = convert(seaTunnelRowType.getFieldName(i), array, ((ArrayType<?, ?>) arrayType).getElementType());
objects[i] =
convert(
seaTunnelRowType.getFieldName(i),
array,
((ArrayType<?, ?>) arrayType).getElementType());
break;
case MAP:
String fieldName = seaTunnelRowType.getFieldName(i);
Expand Down Expand Up @@ -341,8 +351,8 @@ public static InternalRow reconvert(
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getTableId(),
seaTunnelRow.getRowKind());
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(
seaTunnelRow.getTableId(), seaTunnelRow.getRowKind());
binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
Expand Down Expand Up @@ -415,18 +425,23 @@ public static InternalRow reconvert(
binaryWriter.writeMap(
i,
BinaryMap.valueOf(
reconvert(fieldName, keys, keyType), reconvert(fieldName, values, valueType)),
reconvert(fieldName, keys, keyType),
reconvert(fieldName, values, valueType)),
new InternalMapSerializer(paimonKeyType, paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
reconvert(fieldName, seaTunnelRow.getField(i), arrayType.getElementType());
reconvert(
fieldName,
seaTunnelRow.getField(i),
arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
new InternalArraySerializer(
RowTypeConverter.reconvert(fieldName, arrayType.getElementType())));
RowTypeConverter.reconvert(
fieldName, arrayType.getElementType())));
break;
case ROW:
SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
Expand All @@ -440,7 +455,8 @@ public static InternalRow reconvert(
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelRowType.getFieldType(i).getSqlType().toString(), fieldName);
seaTunnelRowType.getFieldType(i).getSqlType().toString(),
fieldName);
}
}
return binaryRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public class RowKindConverter {

/**
* Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link InternalRow}
*
* @param tableId Table identifier
* @param seaTunnelRowKind The kind of change that a row describes in a changelog.
* @return
*/
public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowKind(
String tableId,
RowKind seaTunnelRowKind) {
String tableId, RowKind seaTunnelRowKind) {
switch (seaTunnelRowKind) {
case DELETE:
return org.apache.paimon.types.RowKind.DELETE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -169,7 +168,8 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
if (seaTunnelDataType == null) {
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getTypeRoot().toString(), typeDefine.getName());
dataType.getTypeRoot().toString(),
typeDefine.getName());
}
break;
case MAP:
Expand All @@ -180,7 +180,9 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
break;
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString(), typeDefine.getName());
PaimonConfig.CONNECTOR_IDENTITY,
dataType.asSQLString(),
typeDefine.getName());
}
return physicalColumnBuilder.dataType(seaTunnelDataType).build();
}
Expand All @@ -197,14 +199,15 @@ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema t
int totalFields = seaTunnelRowType.getTotalFields();
DataType[] dataTypes = new DataType[totalFields];
for (int i = 0; i < totalFields; i++) {
DataType dataType = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]);
DataType dataType =
SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]);
dataTypes[i] = dataType;
}
List<DataField> fields = tableSchema.fields();
// DataType[] dataTypes =
// Arrays.stream(fieldTypes)
// .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
// .toArray(DataType[]::new);
// DataType[] dataTypes =
// Arrays.stream(fieldTypes)
// .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
// .toArray(DataType[]::new);
DataField[] dataFields = new DataField[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
DataType dataType = dataTypes[i];
Expand Down Expand Up @@ -233,6 +236,7 @@ public static BasicTypeDefine<DataType> reconvert(Column column) {

/**
* Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data type {@link DataType}
*
* @param fieldName SeaTunnel field name
* @param dataType SeaTunnel data type {@link SeaTunnelDataType}
* @return Paimon data type {@link DataType}
Expand Down Expand Up @@ -354,12 +358,16 @@ public DataType visit(String fieldName, SeaTunnelDataType<?> dataType) {
int totalFields = row.getTotalFields();
DataType[] dataTypes = new DataType[totalFields];
for (int i = 0; i < totalFields; i++) {
dataTypes[i] = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]);
dataTypes[i] =
SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(
fieldNames[i], fieldTypes[i]);
}
return DataTypes.ROW(dataTypes);
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
}
}
Expand Down

0 comments on commit 2c327bf

Please sign in to comment.