Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed May 13, 2024
1 parent 7ba64ab commit 6ecddb2
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 500 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.BucketsRowChannelComputer;
import org.apache.paimon.flink.sink.CombinedTableCompactorSink;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.MultiTablesCompactorSink;
import org.apache.paimon.flink.source.CombinedTableCompactorSourceBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand Down Expand Up @@ -188,23 +189,34 @@ private void buildForCombinedMode() {
ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
// TODO: Currently, multi-tables compaction don't support tables which bucketmode is UNWARE.
MultiTablesCompactorSourceBuilder sourceBuilder =
new MultiTablesCompactorSourceBuilder(
CombinedTableCompactorSourceBuilder sourceBuilder =
new CombinedTableCompactorSourceBuilder(
catalogLoader(),
databasePattern,
includingPattern,
excludingPattern,
tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
DataStream<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();

DataStream<RowData> partitioned =
// multi bucket table which has multi bucket in a partition like fix bucket and dynamic
// bucket
DataStream<RowData> awareBucketTableSource =
partition(
source,
sourceBuilder
.withEnv(env)
.withContinuousMode(isStreaming)
.buildAwareBucketTableSource(),
new BucketsRowChannelComputer(),
tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM));
new MultiTablesCompactorSink(catalogLoader(), tableOptions).sinkFrom(partitioned);

// unaware bucket table
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource =
sourceBuilder
.withEnv(env)
.withContinuousMode(isStreaming)
.buildForUnawareBucketsTableSource();

new CombinedTableCompactorSink(catalogLoader(), tableOptions)
.sinkFrom(awareBucketTableSource, unawareBucketTableSource);
}

private void buildForTraditionalCompaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public boolean checkTableScanned(Identifier identifier) {

@Override
public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
if (fileStoreTable.bucketMode() != BucketMode.BUCKET_UNAWARE) {
BucketsTable bucketsTable =
new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName())
.copy(compactOptions(isStreaming));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public boolean checkTableScanned(Identifier identifier) {

@Override
public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
tablesMap.put(
identifier,
new AppendOnlyTableCompactionCoordinator(fileStoreTable, isStreaming));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.WrappedManifestCommittable;
Expand Down Expand Up @@ -48,7 +49,7 @@
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;

/** A sink for processing multi-tables in dedicated compaction job. */
public class MultiTablesCompactorSink implements Serializable {
public class CombinedTableCompactorSink implements Serializable {
private static final long serialVersionUID = 1L;

private static final String WRITER_NAME = "Writer";
Expand All @@ -59,54 +60,75 @@ public class MultiTablesCompactorSink implements Serializable {

private final Options options;

public MultiTablesCompactorSink(Catalog.Loader catalogLoader, Options options) {
public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options options) {
this.catalogLoader = catalogLoader;
this.ignorePreviousFiles = false;
this.options = options;
}

public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
public DataStreamSink<?> sinkFrom(
DataStream<RowData> awareBucketTableSource,
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource) {
// This commitUser is valid only for new jobs.
// After the job starts, this commitUser will be recorded into the states of write and
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(input, initialCommitUser);
return sinkFrom(awareBucketTableSource, unawareBucketTableSource, initialCommitUser);
}

public DataStreamSink<?> sinkFrom(DataStream<RowData> input, String initialCommitUser) {
public DataStreamSink<?> sinkFrom(
DataStream<RowData> awareBucketTableSource,
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource,
String initialCommitUser) {
// do the actually writing action, no snapshot generated in this stage
SingleOutputStreamOperator<MultiTableCommittable> written =
doWrite(input, initialCommitUser, input.getParallelism());
DataStream<MultiTableCommittable> written =
doWrite(awareBucketTableSource, unawareBucketTableSource, initialCommitUser);

// commit the committable to generate a new snapshot
return doCommit(written, initialCommitUser);
}

public SingleOutputStreamOperator<MultiTableCommittable> doWrite(
DataStream<RowData> input, String commitUser, Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
public DataStream<MultiTableCommittable> doWrite(
DataStream<RowData> awareBucketTableSource,
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource,
String commitUser) {
StreamExecutionEnvironment env = awareBucketTableSource.getExecutionEnvironment();
boolean isStreaming =
env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

SingleOutputStreamOperator<MultiTableCommittable> written =
input.transform(
WRITER_NAME,
SingleOutputStreamOperator<MultiTableCommittable> multiBucketTableRewriter =
awareBucketTableSource
.transform(
String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
createWriteOperator(
combinedMultiComacptionWriteOperator(
env.getCheckpointConfig(), isStreaming, commitUser))
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);
.setParallelism(awareBucketTableSource.getParallelism());

SingleOutputStreamOperator<MultiTableCommittable> unawareBucketTableRewriter =
unawareBucketTableSource
.transform(
String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
new AppendOnlyMultiTableCompactionWorkerOperator(
catalogLoader, commitUser, options))
.setParallelism(unawareBucketTableSource.getParallelism());

if (!isStreaming) {
assertBatchConfiguration(env, written.getParallelism());
assertBatchConfiguration(env, multiBucketTableRewriter.getParallelism());
assertBatchConfiguration(env, unawareBucketTableRewriter.getParallelism());
}

if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
declareManagedMemory(
multiBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
declareManagedMemory(
unawareBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
return written;
return multiBucketTableRewriter.union(unawareBucketTableRewriter);
}

protected DataStreamSink<?> doCommit(
Expand Down Expand Up @@ -138,8 +160,9 @@ protected DataStreamSink<?> doCommit(
}

// TODO:refactor FlinkSink to adopt this sink
protected OneInputStreamOperator<RowData, MultiTableCommittable> createWriteOperator(
CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
protected OneInputStreamOperator<RowData, MultiTableCommittable>
combinedMultiComacptionWriteOperator(
CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
return new MultiTablesStoreCompactOperator(
catalogLoader,
commitUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.paimon.flink.source;

import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.source.operator.MultiTablesBatchCompactorSourceFunction;
import org.apache.paimon.flink.source.operator.MultiTablesStreamingCompactorSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedAwareBatchSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedUnawareBatchSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedUnawareStreamingSourceFunction;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -35,9 +38,9 @@

/**
* source builder to build a Flink compactor source for multi-tables. This is for dedicated
* compactor jobs.
* compactor jobs in combined mode.
*/
public class MultiTablesCompactorSourceBuilder {
public class CombinedTableCompactorSourceBuilder {
private final Catalog.Loader catalogLoader;
private final Pattern includingPattern;
private final Pattern excludingPattern;
Expand All @@ -47,7 +50,7 @@ public class MultiTablesCompactorSourceBuilder {
private boolean isContinuous = false;
private StreamExecutionEnvironment env;

public MultiTablesCompactorSourceBuilder(
public CombinedTableCompactorSourceBuilder(
Catalog.Loader catalogLoader,
Pattern databasePattern,
Pattern includingPattern,
Expand All @@ -60,39 +63,60 @@ public MultiTablesCompactorSourceBuilder(
this.monitorInterval = monitorInterval;
}

public MultiTablesCompactorSourceBuilder withContinuousMode(boolean isContinuous) {
public CombinedTableCompactorSourceBuilder withContinuousMode(boolean isContinuous) {
this.isContinuous = isContinuous;
return this;
}

public MultiTablesCompactorSourceBuilder withEnv(StreamExecutionEnvironment env) {
public CombinedTableCompactorSourceBuilder withEnv(StreamExecutionEnvironment env) {
this.env = env;
return this;
}

public DataStream<RowData> build() {
public DataStream<RowData> buildAwareBucketTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
RowType produceType = BucketsTable.getRowType();
if (isContinuous) {
return MultiTablesStreamingCompactorSourceFunction.buildSource(
return CombinedAwareStreamingSourceFunction.buildSource(
env,
"MultiTables-StreamingCompactorSource",
"Combine-MultiBucketTables--StreamingCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
} else {
return MultiTablesBatchCompactorSourceFunction.buildSource(
return CombinedAwareBatchSourceFunction.buildSource(
env,
"MultiTables-BatchCompactorSource",
"Combine-MultiBucketTables-BatchCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
catalogLoader,
includingPattern,
excludingPattern,
databasePattern);
}
}

public DataStream<MultiTableAppendOnlyCompactionTask> buildForUnawareBucketsTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
if (isContinuous) {
return CombinedUnawareStreamingSourceFunction.buildSource(
env,
"Combined-UnawareBucketTables-StreamingCompactorSource",
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
} else {
return CombinedUnawareBatchSourceFunction.buildSource(
env,
"Combined-UnawareBucketTables-BatchCompactorSource",
catalogLoader,
includingPattern,
excludingPattern,
databasePattern);
}
}
}

0 comments on commit 6ecddb2

Please sign in to comment.