Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion #6767

Merged
merged 16 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

Expand All @@ -35,6 +36,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -46,9 +48,9 @@
public class PaimonCatalog implements Catalog, PaimonTable {
private static final String DEFAULT_DATABASE = "default";

private String catalogName;
private PaimonSinkConfig paimonSinkConfig;
private PaimonCatalogLoader paimonCatalogLoader;
private final String catalogName;
private final PaimonSinkConfig paimonSinkConfig;
private final PaimonCatalogLoader paimonCatalogLoader;
private org.apache.paimon.catalog.Catalog catalog;

public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) {
Expand Down Expand Up @@ -181,7 +183,12 @@ private CatalogTable toCatalogTable(
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
Column column = SchemaUtil.toSeaTunnelType(dataField.type());
BasicTypeDefine.BasicTypeDefineBuilder<DataType> typeDefineBuilder =
BasicTypeDefine.<DataType>builder()
.name(dataField.name())
.comment(dataField.description())
.nativeType(dataField.type());
Column column = SchemaUtil.toSeaTunnelType(typeDefineBuilder.build());
builder.column(column);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
@Getter
public class PaimonConfig implements Serializable {

public static final String CONNECTOR_IDENTITY = "Paimon";

public static final Option<String> WAREHOUSE =
Options.key("warehouse")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.data;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;

import org.apache.paimon.types.DataType;
Expand All @@ -29,21 +30,21 @@

@Slf4j
@AutoService(TypeConverter.class)
public class PaimonTypeMapper implements TypeConverter<DataType> {
public class PaimonTypeMapper implements TypeConverter<BasicTypeDefine<DataType>> {
public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();

@Override
public String identifier() {
return PaimonSink.PLUGIN_NAME;
return PaimonConfig.CONNECTOR_IDENTITY;
}

@Override
public Column convert(DataType dataType) {
return RowTypeConverter.convert(dataType);
public Column convert(BasicTypeDefine<DataType> typeDefine) {
return RowTypeConverter.convert(typeDefine);
}

@Override
public DataType reconvert(Column column) {
public BasicTypeDefine<DataType> reconvert(Column column) {
return RowTypeConverter.reconvert(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;

Expand Down Expand Up @@ -320,7 +321,7 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
break;
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw common error by use this method.

public static SeaTunnelRuntimeException unsupportedDataType(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw common error by use this method.

public static SeaTunnelRuntimeException unsupportedDataType(

done.

"SeaTunnel does not support this type");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;

Expand Down Expand Up @@ -80,11 +82,20 @@ public static SeaTunnelRowType convert(RowType rowType) {
/**
* Convert Paimon row type {@link DataType} to SeaTunnel row type {@link SeaTunnelDataType}
*
* @param dataType Paimon data type
* @param typeDefine Paimon data type
* @return SeaTunnel data type {@link SeaTunnelDataType}
*/
public static Column convert(DataType dataType) {
PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = PhysicalColumn.builder();
public static Column convert(BasicTypeDefine<DataType> typeDefine) {

PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder =
PhysicalColumn.builder()
.name(typeDefine.getName())
.sourceType(typeDefine.getColumnType())
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
.comment(typeDefine.getComment());

DataType dataType = typeDefine.getNativeType();
SeaTunnelDataType<?> seaTunnelDataType;
PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
PaimonToSeaTunnelTypeVisitor.INSTANCE;
Expand Down Expand Up @@ -168,8 +179,7 @@ public static Column convert(DataType dataType) {
String.format(
"Paimon dataType not support this genericType [%s]",
dataType.asSQLString());
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw new PaimonConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
return physicalColumnBuilder.dataType(seaTunnelDataType).build();
}
Expand Down Expand Up @@ -209,7 +219,7 @@ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema t
* @param column SeaTunnel data type {@link Column}
* @return Paimon data type {@link DataType}
*/
public static DataType reconvert(Column column) {
public static BasicTypeDefine<DataType> reconvert(Column column) {
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column);
}

Expand All @@ -234,18 +244,53 @@ private static class SeaTunnelTypeToPaimonVisitor {

private SeaTunnelTypeToPaimonVisitor() {}

public DataType visit(Column column) {
public BasicTypeDefine<DataType> visit(Column column) {
BasicTypeDefine.BasicTypeDefineBuilder<DataType> builder =
BasicTypeDefine.<DataType>builder()
.name(column.getName())
.nullable(column.isNullable())
.comment(column.getComment())
.defaultValue(column.getDefaultValue());
SeaTunnelDataType<?> dataType = column.getDataType();
Integer scale = column.getScale();
switch (dataType.getSqlType()) {
case TIMESTAMP:
return DataTypes.TIMESTAMP(
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale);
int timestampScale =
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale;
TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale);
builder.nativeType(timestampType);
builder.dataType(timestampType.getTypeRoot().name());
builder.columnType(timestampType.toString());
builder.scale(timestampScale);
builder.length(column.getColumnLength());
return builder.build();
case TIME:
return DataTypes.TIME(
Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale);
int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale;
TimeType timeType = DataTypes.TIME(timeScale);
builder.nativeType(timeType);
builder.columnType(timeType.toString());
builder.dataType(timeType.getTypeRoot().name());
builder.scale(timeScale);
builder.length(column.getColumnLength());
return builder.build();
case DECIMAL:
int precision =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, the precision and scale obtained here may be 0, and you must handle this situation to achieve better compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, the precision and scale obtained here may be 0, and you must handle this situation to achieve better compatibility.

done.

((org.apache.seatunnel.api.table.type.DecimalType) dataType)
.getPrecision();
DecimalType decimalType = DataTypes.DECIMAL(precision, scale);
builder.nativeType(decimalType);
builder.columnType(decimalType.toString());
builder.dataType(decimalType.getTypeRoot().name());
builder.scale(scale);
builder.precision((long) precision);
builder.length(column.getColumnLength());
return builder.build();
default:
return visit(dataType);
builder.nativeType(visit(dataType));
builder.columnType(dataType.toString());
builder.length(column.getColumnLength());
builder.dataType(dataType.getSqlType().name());
return builder.build();
}
}

Expand Down Expand Up @@ -302,7 +347,7 @@ public DataType visit(SeaTunnelDataType<?> dataType) {
return DataTypes.ROW(dataTypes);
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + dataType.getSqlType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;

Expand All @@ -34,7 +35,8 @@
public class SchemaUtil {

public static DataType toPaimonType(Column column) {
return PaimonTypeMapper.INSTANCE.reconvert(column);
BasicTypeDefine<DataType> basicTypeDefine = PaimonTypeMapper.INSTANCE.reconvert(column);
return basicTypeDefine.getNativeType();
}

public static Schema toPaimonSchema(
Expand Down Expand Up @@ -62,8 +64,8 @@ public static Schema toPaimonSchema(
return paiSchemaBuilder.build();
}

public static Column toSeaTunnelType(DataType dataType) {
return PaimonTypeMapper.INSTANCE.convert(dataType);
public static Column toSeaTunnelType(BasicTypeDefine<DataType> typeDefine) {
return PaimonTypeMapper.INSTANCE.convert(typeDefine);
}

public static DataField getDataField(List<DataField> fields, String fieldName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.utils;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
Expand Down Expand Up @@ -46,6 +49,10 @@ public class RowTypeConverterTest {

private RowType rowType;

private BasicTypeDefine<DataType> typeDefine;

private Column column;

private TableSchema tableSchema;

public static final RowType DEFAULT_ROW_TYPE =
Expand Down Expand Up @@ -148,10 +155,39 @@ public void before() {
KEY_NAME_LIST,
Collections.EMPTY_MAP,
"");

typeDefine =
BasicTypeDefine.<DataType>builder()
.name("c_decimal")
.comment("c_decimal_type_define")
.columnType("DECIMAL(30, 8)")
.nativeType(DataTypes.DECIMAL(30, 8))
.dataType(DataTypes.DECIMAL(30, 8).toString())
.length(30L)
.precision(30L)
.scale(8)
.defaultValue(3.0)
.nullable(false)
.build();

org.apache.seatunnel.api.table.type.DecimalType dataType =
new org.apache.seatunnel.api.table.type.DecimalType(30, 8);

column =
PhysicalColumn.builder()
.name("c_decimal")
.sourceType(DataTypes.DECIMAL(30, 8).toString())
.nullable(false)
.dataType(dataType)
.columnLength(30L)
.defaultValue(3.0)
.scale(8)
.comment("c_decimal_type_define")
.build();
}

@Test
public void paimonToSeaTunnel() {
public void paimonRowTypeToSeaTunnel() {
SeaTunnelRowType convert = RowTypeConverter.convert(rowType);
Assertions.assertEquals(convert, seaTunnelRowType);
}
Expand All @@ -161,4 +197,27 @@ public void seaTunnelToPaimon() {
RowType convert = RowTypeConverter.reconvert(seaTunnelRowType, tableSchema);
Assertions.assertEquals(convert, rowType);
}

@Test
public void paimonDataTypeToSeaTunnelColumn() {
Column column = RowTypeConverter.convert(typeDefine);
isEquals(column, typeDefine);
}

@Test
public void seaTunnelColumnToPaimonDataType() {
BasicTypeDefine<DataType> dataTypeDefine = RowTypeConverter.reconvert(column);
isEquals(column, dataTypeDefine);
}

private void isEquals(Column column, BasicTypeDefine<DataType> dataTypeDefine) {
Assertions.assertEquals(column.getComment(), dataTypeDefine.getComment());
Assertions.assertEquals(column.getColumnLength(), dataTypeDefine.getLength());
Assertions.assertEquals(column.getName(), dataTypeDefine.getName());
Assertions.assertEquals(column.isNullable(), dataTypeDefine.isNullable());
Assertions.assertEquals(column.getDefaultValue(), dataTypeDefine.getDefaultValue());
Assertions.assertEquals(column.getScale(), dataTypeDefine.getScale());
Assertions.assertTrue(
column.getDataType().toString().equalsIgnoreCase(dataTypeDefine.getColumnType()));
}
}