Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Support TableSourceFactory on Paimon #6695

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class PaimonCatalogFactory implements CatalogFactory {

public static final String IDENTIFIER = PaimonConfig.CONNECTOR_IDENTITY;

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
return new PaimonCatalog(catalogName, new PaimonSinkConfig(readonlyConfig));
}

@Override
public String factoryIdentifier() {
return "Paimon";
return PaimonConfig.CONNECTOR_IDENTITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public PaimonCatalogLoader(PaimonSinkConfig paimonSinkConfig) {
}

public Catalog loadCatalog() {
// When using the seatunel engine, set the current class loader to prevent loading failures
// When using the seatunnel engine, set the current class loader to prevent loading failures
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
final Map<String, String> optionsMap = new HashMap<>(1);
optionsMap.put(WAREHOUSE.key(), warehouse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Map;

import static java.util.stream.Collectors.toList;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

/**
* Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link
Expand All @@ -44,6 +43,8 @@
@Getter
public class PaimonConfig implements Serializable {

public static final String CONNECTOR_IDENTITY = "Paimon";

public static final Option<String> WAREHOUSE =
Options.key("warehouse")
.stringType()
Expand Down Expand Up @@ -103,20 +104,15 @@ public class PaimonConfig implements Serializable {
protected String hadoopConfPath;

public PaimonConfig(ReadonlyConfig readonlyConfig) {
this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE));
this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE));
this.table = checkArgumentNotNull(readonlyConfig.get(TABLE));
this.catalogName = readonlyConfig.get(CATALOG_NAME);
this.warehouse = readonlyConfig.get(WAREHOUSE);
this.namespace = readonlyConfig.get(DATABASE);
this.table = readonlyConfig.get(TABLE);
this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF);
this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH);
}

protected <T> T checkArgumentNotNull(T argument) {
checkNotNull(argument);
return argument;
}

@VisibleForTesting
public static List<String> stringToList(String value, String regex) {
if (value == null || value.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,101 +17,38 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import com.google.auto.service.AutoService;

import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH;
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
import java.util.Collections;
import java.util.List;

/** Paimon connector source class. */
@AutoService(SeaTunnelSource.class)
public class PaimonSource
implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, PaimonSourceState> {

private static final long serialVersionUID = 1L;

public static final String PLUGIN_NAME = "Paimon";

private Config pluginConfig;

private SeaTunnelRowType seaTunnelRowType;

private CatalogTable catalogTable;
private PaimonConfig sourceConfig;
private Table table;

@Override
public String getPluginName() {
return PLUGIN_NAME;
return PaimonConfig.CONNECTOR_IDENTITY;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
final CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key());
if (!result.isSuccess()) {
throw new PaimonConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
// initialize paimon table
final String warehouse = pluginConfig.getString(WAREHOUSE.key());
final String database = pluginConfig.getString(DATABASE.key());
final String table = pluginConfig.getString(TABLE.key());
final Map<String, String> optionsMap = new HashMap<>();
optionsMap.put(WAREHOUSE.key(), warehouse);
final Options options = Options.fromMap(optionsMap);
final Configuration hadoopConf = new Configuration();
if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) {
hadoopConf.addResource(new Path(pluginConfig.getString(HDFS_SITE_PATH.key())));
}
final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf);
try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) {
Identifier identifier = Identifier.create(database, table);
this.table = catalog.getTable(identifier);
} catch (Exception e) {
String errorMsg =
String.format(
"Failed to get table [%s] from database [%s] on warehouse [%s]",
database, table, warehouse);
throw new PaimonConnectorException(
PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e);
}
// TODO: Support column projection
seaTunnelRowType = RowTypeConverter.convert(this.table.rowType());
public PaimonSource(PaimonConfig sourceConfig, CatalogTable catalogTable, Table paimonTable) {
this.sourceConfig = sourceConfig;
this.catalogTable = catalogTable;
this.table = paimonTable;
}

@Override
Expand All @@ -120,14 +57,14 @@ public Boundedness getBoundedness() {
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return seaTunnelRowType;
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(catalogTable);
}

@Override
public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
return new PaimonSourceReader(readerContext, table, seaTunnelRowType);
return new PaimonSourceReader(readerContext, table, catalogTable.getSeaTunnelRowType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,31 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.paimon.table.Table;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;

@AutoService(Factory.class)
public class PaimonSourceFactory implements TableSourceFactory {

Expand All @@ -36,7 +53,7 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(PaimonConfig.WAREHOUSE)
.required(WAREHOUSE)
.required(PaimonConfig.DATABASE)
.required(PaimonConfig.TABLE)
.optional(PaimonConfig.HDFS_SITE_PATH)
Expand All @@ -47,4 +64,25 @@ public OptionRule optionRule() {
public Class<? extends SeaTunnelSource> getSourceClass() {
return PaimonSource.class;
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
ReadonlyConfig config = context.getOptions();
PaimonConfig sourceConfig = new PaimonConfig(config);
Optional<Catalog> optionalCatalog =
FactoryUtil.createOptionalCatalog(
sourceConfig.getCatalogName(),
config,
sourceConfig.getClass().getClassLoader(),
PaimonCatalogFactory.IDENTIFIER);
PaimonCatalog paimonCatalog = (PaimonCatalog) optionalCatalog.get();
TablePath tablePath = TablePath.of(sourceConfig.getNamespace(), sourceConfig.getTable());
Table paimonTable = paimonCatalog.getPaimonTable(tablePath);
CatalogTable catalogTable = paimonCatalog.getTable(tablePath);

return () ->
(SeaTunnelSource<T, SplitT, StateT>)
new PaimonSource(sourceConfig, catalogTable, paimonTable);
}
}