Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion #6767

Merged
merged 16 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import static org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.OPERATION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ARRAY_GENERIC_TYPE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;

Expand Down Expand Up @@ -89,6 +92,14 @@ public static SeaTunnelRuntimeException writeSeaTunnelRowFailed(
return new SeaTunnelRuntimeException(WRITE_SEATUNNEL_ROW_ERROR, params, cause);
}

public static SeaTunnelRuntimeException unsupportedDataType(
String identifier, String dataType) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("dataType", dataType);
return new SeaTunnelRuntimeException(UNSUPPORTED_DATA_TYPE_SIMPLE, params);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reuse public static SeaTunnelRuntimeException unsupportedDataType( String identifier, String dataType, String field) . Because we should make sure user know which field can not supported.


public static SeaTunnelRuntimeException unsupportedDataType(
String identifier, String dataType, String field) {
Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -216,4 +227,22 @@ public static SeaTunnelRuntimeException sqlTemplateHandledError(
params.put("optionName", optionName);
return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR, params);
}

public static SeaTunnelRuntimeException unsupportedArrayGenericType(
String identifier, String dataType, String fieldName) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("dataType", dataType);
params.put("fieldName", fieldName);
return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE, params);
}

public static SeaTunnelRuntimeException unsupportedRowKind(
String identifier, String tableId, String rowKind) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("tableId", tableId);
params.put("rowKind", rowKind);
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
WRITE_SEATUNNEL_ROW_ERROR(
"COMMON-23",
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),

SQL_TEMPLATE_HANDLED_ERROR(
"COMMON-24",
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),

VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."),

OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported.");
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."),
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-27", "'<identifier>' unsupported data type '<dataType>'"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-27", "'<identifier>' unsupported data type '<dataType>'"),

UNSUPPORTED_ARRAY_GENERIC_TYPE(
"COMMON-28",
"'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
UNSUPPORTED_ROW_KIND(
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

Expand All @@ -35,6 +36,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -46,9 +48,9 @@
public class PaimonCatalog implements Catalog, PaimonTable {
private static final String DEFAULT_DATABASE = "default";

private String catalogName;
private PaimonSinkConfig paimonSinkConfig;
private PaimonCatalogLoader paimonCatalogLoader;
private final String catalogName;
private final PaimonSinkConfig paimonSinkConfig;
private final PaimonCatalogLoader paimonCatalogLoader;
private org.apache.paimon.catalog.Catalog catalog;

public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) {
Expand Down Expand Up @@ -181,7 +183,12 @@ private CatalogTable toCatalogTable(
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
Column column = SchemaUtil.toSeaTunnelType(dataField.type());
BasicTypeDefine.BasicTypeDefineBuilder<DataType> typeDefineBuilder =
BasicTypeDefine.<DataType>builder()
.name(dataField.name())
.comment(dataField.description())
.nativeType(dataField.type());
Column column = SchemaUtil.toSeaTunnelType(typeDefineBuilder.build());
builder.column(column);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
@Getter
public class PaimonConfig implements Serializable {

public static final String CONNECTOR_IDENTITY = "Paimon";

public static final Option<String> WAREHOUSE =
Options.key("warehouse")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.data;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;

import org.apache.paimon.types.DataType;
Expand All @@ -29,21 +30,21 @@

@Slf4j
@AutoService(TypeConverter.class)
public class PaimonTypeMapper implements TypeConverter<DataType> {
public class PaimonTypeMapper implements TypeConverter<BasicTypeDefine<DataType>> {
public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();

@Override
public String identifier() {
return PaimonSink.PLUGIN_NAME;
return PaimonConfig.CONNECTOR_IDENTITY;
}

@Override
public Column convert(DataType dataType) {
return RowTypeConverter.convert(dataType);
public Column convert(BasicTypeDefine<DataType> typeDefine) {
return RowTypeConverter.convert(typeDefine);
}

@Override
public DataType reconvert(Column column) {
public BasicTypeDefine<DataType> reconvert(Column column) {
return RowTypeConverter.reconvert(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryArrayWriter;
Expand Down Expand Up @@ -68,7 +68,8 @@ private RowConverter() {}
* @param dataType Data type of the array
* @return SeaTunnel array object
*/
public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType) {
public static Object convertArrayType(
String fieldName, InternalArray array, SeaTunnelDataType<?> dataType) {
switch (dataType.getSqlType()) {
case STRING:
String[] strings = new String[array.size()];
Expand Down Expand Up @@ -119,10 +120,10 @@ public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType)
}
return doubles;
default:
String errorMsg =
String.format("Array type not support this genericType [%s]", dataType);
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
}

Expand All @@ -133,7 +134,8 @@ public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType)
* @param dataType SeaTunnel array data type
* @return Paimon array object {@link BinaryArray}
*/
public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> dataType) {
public static BinaryArray reconvert(
String fieldName, Object array, SeaTunnelDataType<?> dataType) {
int length = ((Object[]) array).length;
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter;
Expand Down Expand Up @@ -220,10 +222,10 @@ public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> dataType)
}
break;
default:
String errorMsg =
String.format("Array type not support this genericType [%s]", dataType);
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
binaryArrayWriter.complete();
return binaryArray;
Expand All @@ -245,6 +247,7 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
continue;
}
SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldType.getSqlType()) {
case TINYINT:
objects[i] = rowData.getByte(i);
Expand All @@ -265,12 +268,11 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
objects[i] = rowData.getDouble(i);
break;
case DECIMAL:
SeaTunnelDataType<?> decimalType = seaTunnelRowType.getFieldType(i);
Decimal decimal =
rowData.getDecimal(
i,
((DecimalType) decimalType).getPrecision(),
((DecimalType) decimalType).getScale());
((DecimalType) fieldType).getPrecision(),
((DecimalType) fieldType).getScale());
objects[i] = decimal.toBigDecimal();
break;
case STRING:
Expand All @@ -293,19 +295,21 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
objects[i] = timestamp.toLocalDateTime();
break;
case ARRAY:
SeaTunnelDataType<?> arrayType = seaTunnelRowType.getFieldType(i);
InternalArray array = rowData.getArray(i);
objects[i] = convert(array, ((ArrayType<?, ?>) arrayType).getElementType());
InternalArray paimonArray = rowData.getArray(i);
ArrayType<?, ?> seatunnelArray = (ArrayType<?, ?>) fieldType;
objects[i] =
convertArrayType(
fieldName, paimonArray, seatunnelArray.getElementType());
break;
case MAP:
SeaTunnelDataType<?> mapType = seaTunnelRowType.getFieldType(i);
MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
InternalMap map = rowData.getMap(i);
InternalArray keyArray = map.keyArray();
InternalArray valueArray = map.valueArray();
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) mapType).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) mapType).getValueType();
Object[] key = (Object[]) convert(keyArray, keyType);
Object[] value = (Object[]) convert(valueArray, valueType);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
Object[] key = (Object[]) convertArrayType(fieldName, keyArray, keyType);
Object[] value = (Object[]) convertArrayType(fieldName, valueArray, valueType);
Map<Object, Object> mapData = new HashMap<>();
for (int j = 0; j < key.length; j++) {
mapData.put(key[j], value[j]);
Expand All @@ -319,9 +323,10 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
objects[i] = convert(row, (SeaTunnelRowType) rowType);
break;
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"SeaTunnel does not support this type");
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY,
fieldType.getSqlType().toString(),
fieldName);
}
}
return new SeaTunnelRow(objects);
Expand All @@ -343,6 +348,12 @@ public static InternalRow reconvert(
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
if (rowKind == null) {
throw CommonError.unsupportedRowKind(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelRow.getRowKind().shortString(),
seaTunnelRow.getTableId());
}
binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
Expand All @@ -351,6 +362,7 @@ public static InternalRow reconvert(
binaryWriter.setNullAt(i);
continue;
}
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
Expand Down Expand Up @@ -396,7 +408,6 @@ public static InternalRow reconvert(
.setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
break;
case TIMESTAMP:
String fieldName = seaTunnelRowType.getFieldName(i);
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
int precision = ((TimestampType) dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
Expand All @@ -407,26 +418,31 @@ public static InternalRow reconvert(
MapType<?, ?> mapType = (MapType<?, ?>) seaTunnelRowType.getFieldType(i);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
DataType paimonKeyType = RowTypeConverter.reconvert(keyType);
DataType paimonValueType = RowTypeConverter.reconvert(valueType);
DataType paimonKeyType = RowTypeConverter.reconvert(fieldName, keyType);
DataType paimonValueType = RowTypeConverter.reconvert(fieldName, valueType);
Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
Object[] keys = field.keySet().toArray(new Object[0]);
Object[] values = field.values().toArray(new Object[0]);
binaryWriter.writeMap(
i,
BinaryMap.valueOf(
reconvert(keys, keyType), reconvert(values, valueType)),
reconvert(fieldName, keys, keyType),
reconvert(fieldName, values, valueType)),
new InternalMapSerializer(paimonKeyType, paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
reconvert(seaTunnelRow.getField(i), arrayType.getElementType());
reconvert(
fieldName,
seaTunnelRow.getField(i),
arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
new InternalArraySerializer(
RowTypeConverter.reconvert(arrayType.getElementType())));
RowTypeConverter.reconvert(
fieldName, arrayType.getElementType())));
break;
case ROW:
SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
Expand All @@ -438,9 +454,10 @@ public static InternalRow reconvert(
binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType));
break;
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unsupported data type " + seaTunnelRowType.getFieldType(i));
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelRowType.getFieldType(i).getSqlType().toString(),
fieldName);
}
}
return binaryRow;
Expand Down