-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion #6767
[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion #6767
Changes from 6 commits
6c102ae
4a5431f
fb43528
2d1cb44
0e91978
8036acb
548e68a
6e3f204
bdb4b77
ea955f3
760fb29
3915e36
a7c0649
e57182c
b068137
f162b5e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,8 +37,11 @@ | |
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ARRAY_GENERIC_TYPE; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE_SIMPLE; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND; | ||
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR; | ||
|
||
/** | ||
|
@@ -87,6 +90,14 @@ public static SeaTunnelRuntimeException writeSeaTunnelRowFailed( | |
return new SeaTunnelRuntimeException(WRITE_SEATUNNEL_ROW_ERROR, params, cause); | ||
} | ||
|
||
public static SeaTunnelRuntimeException unsupportedDataType( | ||
String identifier, String dataType) { | ||
Map<String, String> params = new HashMap<>(); | ||
params.put("identifier", identifier); | ||
params.put("dataType", dataType); | ||
return new SeaTunnelRuntimeException(UNSUPPORTED_DATA_TYPE_SIMPLE, params); | ||
} | ||
|
||
public static SeaTunnelRuntimeException unsupportedDataType( | ||
String identifier, String dataType, String field) { | ||
Map<String, String> params = new HashMap<>(); | ||
|
@@ -199,4 +210,19 @@ public static SeaTunnelRuntimeException sqlTemplateHandledError( | |
params.put("optionName", optionName); | ||
return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR, params); | ||
} | ||
|
||
public static SeaTunnelRuntimeException unsupportedArrayGenericType( | ||
String identifier, String dataType) { | ||
Map<String, String> params = new HashMap<>(); | ||
params.put("identifier", identifier); | ||
params.put("dataType", dataType); | ||
return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE, params); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done. |
||
|
||
public static SeaTunnelRuntimeException unsupportedRowKind(String identifier, String rowKind) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need tableId when invoke There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
OK, I will optimize the code based on your suggestions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done. |
||
Map<String, String> params = new HashMap<>(); | ||
params.put("identifier", identifier); | ||
params.put("rowKind", rowKind); | ||
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -23,7 +23,9 @@ | |||||
import org.apache.seatunnel.api.table.type.SeaTunnelDataType; | ||||||
import org.apache.seatunnel.api.table.type.SeaTunnelRow; | ||||||
import org.apache.seatunnel.api.table.type.SeaTunnelRowType; | ||||||
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; | ||||||
import org.apache.seatunnel.common.exception.CommonError; | ||||||
import org.apache.seatunnel.common.exception.CommonErrorCode; | ||||||
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; | ||||||
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; | ||||||
|
||||||
import org.apache.paimon.data.BinaryArray; | ||||||
|
@@ -119,10 +121,8 @@ public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType) | |||||
} | ||||||
return doubles; | ||||||
default: | ||||||
String errorMsg = | ||||||
String.format("Array type not support this genericType [%s]", dataType); | ||||||
throw new PaimonConnectorException( | ||||||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); | ||||||
throw CommonError.unsupportedArrayGenericType( | ||||||
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString()); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -220,10 +220,8 @@ public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> dataType) | |||||
} | ||||||
break; | ||||||
default: | ||||||
String errorMsg = | ||||||
String.format("Array type not support this genericType [%s]", dataType); | ||||||
throw new PaimonConnectorException( | ||||||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); | ||||||
throw CommonError.unsupportedArrayGenericType( | ||||||
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString()); | ||||||
} | ||||||
binaryArrayWriter.complete(); | ||||||
return binaryArray; | ||||||
|
@@ -320,7 +318,7 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn | |||||
break; | ||||||
default: | ||||||
throw new PaimonConnectorException( | ||||||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, | ||||||
CommonErrorCode.UNSUPPORTED_DATA_TYPE, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please throw common error by use this method. seatunnel/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java Line 90 in a5609d6
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done. |
||||||
"SeaTunnel does not support this type"); | ||||||
} | ||||||
} | ||||||
|
@@ -438,9 +436,9 @@ public static InternalRow reconvert( | |||||
binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType)); | ||||||
break; | ||||||
default: | ||||||
throw new PaimonConnectorException( | ||||||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, | ||||||
"Unsupported data type " + seaTunnelRowType.getFieldType(i)); | ||||||
throw CommonError.unsupportedDataType( | ||||||
PaimonConfig.CONNECTOR_IDENTITY, | ||||||
seaTunnelRowType.getFieldType(i).getSqlType().toString()); | ||||||
} | ||||||
} | ||||||
return binaryRow; | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,13 +19,14 @@ | |
|
||
import org.apache.seatunnel.api.table.catalog.Column; | ||
import org.apache.seatunnel.api.table.catalog.PhysicalColumn; | ||
import org.apache.seatunnel.api.table.converter.BasicTypeDefine; | ||
import org.apache.seatunnel.api.table.type.BasicType; | ||
import org.apache.seatunnel.api.table.type.LocalTimeType; | ||
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelDataType; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelRowType; | ||
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; | ||
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; | ||
import org.apache.seatunnel.common.exception.CommonError; | ||
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; | ||
|
||
import org.apache.paimon.schema.TableSchema; | ||
import org.apache.paimon.types.ArrayType; | ||
|
@@ -80,11 +81,20 @@ public static SeaTunnelRowType convert(RowType rowType) { | |
/** | ||
* Convert Paimon row type {@link DataType} to SeaTunnel row type {@link SeaTunnelDataType} | ||
* | ||
* @param dataType Paimon data type | ||
* @param typeDefine Paimon data type | ||
* @return SeaTunnel data type {@link SeaTunnelDataType} | ||
*/ | ||
public static Column convert(DataType dataType) { | ||
PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = PhysicalColumn.builder(); | ||
public static Column convert(BasicTypeDefine<DataType> typeDefine) { | ||
|
||
PhysicalColumn.PhysicalColumnBuilder physicalColumnBuilder = | ||
PhysicalColumn.builder() | ||
.name(typeDefine.getName()) | ||
.sourceType(typeDefine.getColumnType()) | ||
.nullable(typeDefine.isNullable()) | ||
.defaultValue(typeDefine.getDefaultValue()) | ||
.comment(typeDefine.getComment()); | ||
|
||
DataType dataType = typeDefine.getNativeType(); | ||
SeaTunnelDataType<?> seaTunnelDataType; | ||
PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor = | ||
PaimonToSeaTunnelTypeVisitor.INSTANCE; | ||
|
@@ -164,12 +174,8 @@ public static Column convert(DataType dataType) { | |
seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((RowType) dataType); | ||
break; | ||
default: | ||
String errorMsg = | ||
String.format( | ||
"Paimon dataType not support this genericType [%s]", | ||
dataType.asSQLString()); | ||
throw new PaimonConnectorException( | ||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); | ||
throw CommonError.unsupportedDataType( | ||
PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString()); | ||
} | ||
return physicalColumnBuilder.dataType(seaTunnelDataType).build(); | ||
} | ||
|
@@ -209,7 +215,7 @@ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema t | |
* @param column SeaTunnel data type {@link Column} | ||
* @return Paimon data type {@link DataType} | ||
*/ | ||
public static DataType reconvert(Column column) { | ||
public static BasicTypeDefine<DataType> reconvert(Column column) { | ||
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(column); | ||
} | ||
|
||
|
@@ -234,18 +240,53 @@ private static class SeaTunnelTypeToPaimonVisitor { | |
|
||
private SeaTunnelTypeToPaimonVisitor() {} | ||
|
||
public DataType visit(Column column) { | ||
public BasicTypeDefine<DataType> visit(Column column) { | ||
BasicTypeDefine.BasicTypeDefineBuilder<DataType> builder = | ||
BasicTypeDefine.<DataType>builder() | ||
.name(column.getName()) | ||
.nullable(column.isNullable()) | ||
.comment(column.getComment()) | ||
.defaultValue(column.getDefaultValue()); | ||
SeaTunnelDataType<?> dataType = column.getDataType(); | ||
Integer scale = column.getScale(); | ||
switch (dataType.getSqlType()) { | ||
case TIMESTAMP: | ||
return DataTypes.TIMESTAMP( | ||
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale); | ||
int timestampScale = | ||
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale; | ||
TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale); | ||
builder.nativeType(timestampType); | ||
builder.dataType(timestampType.getTypeRoot().name()); | ||
builder.columnType(timestampType.toString()); | ||
builder.scale(timestampScale); | ||
builder.length(column.getColumnLength()); | ||
return builder.build(); | ||
case TIME: | ||
return DataTypes.TIME( | ||
Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale); | ||
int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale; | ||
TimeType timeType = DataTypes.TIME(timeScale); | ||
builder.nativeType(timeType); | ||
builder.columnType(timeType.toString()); | ||
builder.dataType(timeType.getTypeRoot().name()); | ||
builder.scale(timeScale); | ||
builder.length(column.getColumnLength()); | ||
return builder.build(); | ||
case DECIMAL: | ||
int precision = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In some cases, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done. |
||
((org.apache.seatunnel.api.table.type.DecimalType) dataType) | ||
.getPrecision(); | ||
DecimalType decimalType = DataTypes.DECIMAL(precision, scale); | ||
builder.nativeType(decimalType); | ||
builder.columnType(decimalType.toString()); | ||
builder.dataType(decimalType.getTypeRoot().name()); | ||
builder.scale(scale); | ||
builder.precision((long) precision); | ||
builder.length(column.getColumnLength()); | ||
return builder.build(); | ||
default: | ||
return visit(dataType); | ||
builder.nativeType(visit(dataType)); | ||
builder.columnType(dataType.toString()); | ||
builder.length(column.getColumnLength()); | ||
builder.dataType(dataType.getSqlType().name()); | ||
return builder.build(); | ||
} | ||
} | ||
|
||
|
@@ -301,9 +342,8 @@ public DataType visit(SeaTunnelDataType<?> dataType) { | |
Arrays.stream(fieldTypes).map(this::visit).toArray(DataType[]::new); | ||
return DataTypes.ROW(dataTypes); | ||
default: | ||
throw new PaimonConnectorException( | ||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, | ||
"Unsupported data type: " + dataType.getSqlType()); | ||
throw CommonError.unsupportedDataType( | ||
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString()); | ||
} | ||
} | ||
} | ||
|
@@ -417,12 +457,9 @@ public SeaTunnelDataType<?> visit(ArrayType arrayType) { | |
case DOUBLE: | ||
return org.apache.seatunnel.api.table.type.ArrayType.DOUBLE_ARRAY_TYPE; | ||
default: | ||
String errorMsg = | ||
String.format( | ||
"Array type not support this genericType [%s]", | ||
seaTunnelArrayType); | ||
throw new PaimonConnectorException( | ||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); | ||
throw CommonError.unsupportedArrayGenericType( | ||
PaimonConfig.CONNECTOR_IDENTITY, | ||
seaTunnelArrayType.getSqlType().toString()); | ||
} | ||
} | ||
|
||
|
@@ -445,9 +482,8 @@ public SeaTunnelDataType<?> visit(RowType rowType) { | |
|
||
@Override | ||
protected SeaTunnelDataType defaultMethod(DataType dataType) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Hisoka-X This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please put fieldname as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done. |
||
throw new PaimonConnectorException( | ||
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, | ||
"Unsupported data type: " + dataType); | ||
throw CommonError.unsupportedDataType( | ||
PaimonConfig.CONNECTOR_IDENTITY, dataType.getTypeRoot().name()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reuse
public static SeaTunnelRuntimeException unsupportedDataType( String identifier, String dataType, String field)
. Because we should make sure user know which field can not supported.