Skip to content

Commit

Permalink
source factory
Browse files Browse the repository at this point in the history
  • Loading branch information
ClownXC committed Apr 14, 2024
1 parent 4f4fd7b commit 3a02242
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;

import com.google.auto.service.AutoService;
Expand All @@ -35,7 +36,7 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig readonlyConfig)

@Override
public String factoryIdentifier() {
return "Paimon";
return PaimonConfig.CONNECTOR_IDENTITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Map;

import static java.util.stream.Collectors.toList;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

/**
* Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link
Expand All @@ -44,6 +43,8 @@
@Getter
public class PaimonConfig implements Serializable {

public static final String CONNECTOR_IDENTITY = "Paimon";

public static final Option<String> WAREHOUSE =
Options.key("warehouse")
.stringType()
Expand Down Expand Up @@ -103,20 +104,15 @@ public class PaimonConfig implements Serializable {
protected String hadoopConfPath;

public PaimonConfig(ReadonlyConfig readonlyConfig) {
this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE));
this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE));
this.table = checkArgumentNotNull(readonlyConfig.get(TABLE));
this.catalogName = readonlyConfig.get(CATALOG_NAME);
this.warehouse = readonlyConfig.get(WAREHOUSE);
this.namespace = readonlyConfig.get(DATABASE);
this.table = readonlyConfig.get(TABLE);
this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF);
this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH);
}

protected <T> T checkArgumentNotNull(T argument) {
checkNotNull(argument);
return argument;
}

@VisibleForTesting
public static List<String> stringToList(String value, String regex) {
if (value == null || value.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,101 +17,43 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import com.google.auto.service.AutoService;

import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH;
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
import java.util.Collections;
import java.util.List;

/** Paimon connector source class. */
@AutoService(SeaTunnelSource.class)
public class PaimonSource
implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, PaimonSourceState> {

private static final long serialVersionUID = 1L;

public static final String PLUGIN_NAME = "Paimon";

private Config pluginConfig;

private CatalogTable catalogTable;
private PaimonConfig sourceConfig;
private SeaTunnelRowType seaTunnelRowType;

private Table table;

@Override
public String getPluginName() {
return PLUGIN_NAME;
return PaimonConfig.CONNECTOR_IDENTITY;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
final CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key());
if (!result.isSuccess()) {
throw new PaimonConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
// initialize paimon table
final String warehouse = pluginConfig.getString(WAREHOUSE.key());
final String database = pluginConfig.getString(DATABASE.key());
final String table = pluginConfig.getString(TABLE.key());
final Map<String, String> optionsMap = new HashMap<>();
optionsMap.put(WAREHOUSE.key(), warehouse);
final Options options = Options.fromMap(optionsMap);
final Configuration hadoopConf = new Configuration();
if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) {
hadoopConf.addResource(new Path(pluginConfig.getString(HDFS_SITE_PATH.key())));
}
final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf);
try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) {
Identifier identifier = Identifier.create(database, table);
this.table = catalog.getTable(identifier);
} catch (Exception e) {
String errorMsg =
String.format(
"Failed to get table [%s] from database [%s] on warehouse [%s]",
database, table, warehouse);
throw new PaimonConnectorException(
PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e);
}
public PaimonSource(PaimonConfig sourceConfig, CatalogTable catalogTable, Table paimonTable) {
this.sourceConfig = sourceConfig;
this.catalogTable = catalogTable;
this.table = paimonTable;
// TODO: Support column projection
seaTunnelRowType = RowTypeConverter.convert(this.table.rowType());
this.seaTunnelRowType = RowTypeConverter.convert(this.table.rowType());
}

@Override
Expand All @@ -120,8 +62,8 @@ public Boundedness getBoundedness() {
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return seaTunnelRowType;
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(catalogTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.paimon.table.Table;

import com.google.auto.service.AutoService;

import java.io.Serializable;

@AutoService(Factory.class)
public class PaimonSourceFactory implements TableSourceFactory {

Expand All @@ -47,4 +60,19 @@ public OptionRule optionRule() {
public Class<? extends SeaTunnelSource> getSourceClass() {
return PaimonSource.class;
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
ReadonlyConfig config = context.getOptions();
PaimonConfig sourceConfig = new PaimonConfig(config);
PaimonCatalogFactory factory = new PaimonCatalogFactory();
Catalog catalog = factory.createCatalog(PaimonConfig.CONNECTOR_IDENTITY, config);
catalog.open();
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(catalogTable.getTablePath());
return () ->
(SeaTunnelSource<T, SplitT, StateT>)
new PaimonSource(sourceConfig, catalogTable, paimonTable);
}
}

0 comments on commit 3a02242

Please sign in to comment.