Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chore]fix magicnumber with thread sleep. #6741

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class AmazonDynamoDBSourceReader
protected AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
Queue<AmazonDynamoDBSourceSplit> pendingSplits = new ConcurrentLinkedDeque<>();
private static final long SLEEP_TIME_MS = 2000L;

private volatile boolean noMoreSplit;

Expand Down Expand Up @@ -83,7 +84,6 @@ public void close() {
}

@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
synchronized (output.getCheckpointLock()) {
AmazonDynamoDBSourceSplit split = pendingSplits.poll();
Expand All @@ -95,7 +95,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded amazonDynamodb source");
context.signalNoMoreElement();
Thread.sleep(2000L);
Thread.sleep(SLEEP_TIME_MS);
}
}
if (Objects.nonNull(split)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory {

private final JdbcSourceConfig sourceConfig;
private final JdbcConnectionPoolFactory jdbcConnectionPoolFactory;
private static final long SLEEP_TIME_MS = 300L;

public JdbcConnectionFactory(
JdbcSourceConfig sourceConfig, JdbcConnectionPoolFactory jdbcConnectionPoolFactory) {
Expand Down Expand Up @@ -65,7 +66,7 @@ public Connection connect(JdbcConfiguration config) throws SQLException {
} catch (SQLException e) {
if (i < connectRetryTimes - 1) {
try {
Thread.sleep(300);
Thread.sleep(SLEEP_TIME_MS);
} catch (InterruptedException ie) {
throw new SeaTunnelException(
"Failed to get connection, interrupted while doing another attempt",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class AbstractJdbcSourceChunkSplitter implements JdbcSourceChunk

private final JdbcSourceConfig sourceConfig;
private final JdbcDataSourceDialect dialect;
private static final long SLEEP_TIME_MS = 100L;

public AbstractJdbcSourceChunkSplitter(
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
Expand Down Expand Up @@ -435,12 +436,11 @@ protected int ObjectCompare(Object obj1, Object obj2) {
return ObjectUtils.compare(obj1, obj2);
}

@SuppressWarnings("MagicNumber")
private static void maySleep(int count, TableId tableId) {
// every 100 queries to sleep 1s
if (count % 10 == 0) {
try {
Thread.sleep(100);
Thread.sleep(SLEEP_TIME_MS);
} catch (InterruptedException e) {
// nothing to do
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private Lsn defaultStartingPos;
private SlotCreationResult slotCreationInfo;
private boolean hasInitedSlot;
private static final long SLEEP_TIME_MS = 10L;

/**
* Creates a new replication connection with the given params.
Expand Down Expand Up @@ -721,7 +722,7 @@ private PGReplicationStream startPgReplicationStream(
// TODO DBZ-508 get rid of this
// Needed by tests when connections are opened and closed in a fast sequence
try {
Thread.sleep(10);
Thread.sleep(SLEEP_TIME_MS);
} catch (Exception e) {
}
stream.forceUpdateStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class MultiTableSinkWriter
private final ExecutorService executorService;
private MultiTableResourceManager resourceManager;
private volatile boolean submitted = false;
private static final long SLEEP_TIME_MS = 100L;

public MultiTableSinkWriter(
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters, int queueSize) {
Expand Down Expand Up @@ -305,7 +306,7 @@ private void checkQueueRemain() {
try {
for (BlockingQueue<SeaTunnelRow> blockingQueue : blockingQueues) {
while (!blockingQueue.isEmpty()) {
Thread.sleep(100);
Thread.sleep(SLEEP_TIME_MS);
subSinkErrorCheck();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
protected SplitContext<T, SplitStateT> currentSplitContext;
private Collector<T> currentSplitOutput;
private boolean noMoreSplitsAssignment;
private static final long SLEEP_TIME_MS = 100L;

public SourceReaderBase(
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Expand Down Expand Up @@ -162,7 +163,7 @@ private RecordsWithSplitIds<E> getNextFetch(Collector<T> output) {
if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
try {
log.trace("Current fetch is finished.");
Thread.sleep(100);
Thread.sleep(SLEEP_TIME_MS);
} catch (InterruptedException e) {
throw new SeaTunnelException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class DorisValueReader {

protected SeaTunnelRowType seaTunnelRowType;
protected boolean asyncThreadStarted;
private static final long SLEEP_TIME_MS = 5L;

public DorisValueReader(
PartitionDefinition partition, DorisConfig config, SeaTunnelRowType seaTunnelRowType) {
Expand Down Expand Up @@ -205,7 +206,7 @@ public boolean hasNext() {
} else {
// wait for rowBatch put in queue or eos change
try {
Thread.sleep(5);
Thread.sleep(SLEEP_TIME_MS);
} catch (InterruptedException e) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
private volatile boolean noMoreSplit;
private final long minSplitReadInterval;
private volatile long latestTimestamp = 0;
private static final long SLEEP_TIME_MS = 1000L;

public FakeSourceReader(
SourceReader.Context context,
Expand Down Expand Up @@ -83,7 +84,6 @@ public void close() {
}

@Override
@SuppressWarnings("MagicNumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
long currentTimestamp = Instant.now().toEpochMilli();
if (currentTimestamp <= latestTimestamp + minSplitReadInterval) {
Expand Down Expand Up @@ -117,7 +117,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException
log.info("Closed the bounded fake source");
context.signalNoMoreElement();
}
Thread.sleep(1000L);
Thread.sleep(SLEEP_TIME_MS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class BaseFileSourceReader implements SourceReader<SeaTunnelRow, FileSour
private final SourceReader.Context context;
private final Deque<FileSourceSplit> sourceSplits = new ConcurrentLinkedDeque<>();
private volatile boolean noMoreSplit;
private static final long SLEEP_TIME_MS = 1000L;

public BaseFileSourceReader(ReadStrategy readStrategy, SourceReader.Context context) {
this.readStrategy = readStrategy;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
log.info("Closed the bounded File source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
Thread.sleep(SLEEP_TIME_MS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
private boolean noMoreElementFlag = true;
private Optional<PageInfo> pageInfoOptional = Optional.empty();
private static final long SLEEP_TIME_MS = 10L;

public HttpSourceReader(
HttpParameter httpParameter,
Expand Down Expand Up @@ -167,7 +168,7 @@ public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
updateRequestParam(info);
pollAndCollectData(output);
pageIndex += 1;
Thread.sleep(10);
Thread.sleep(SLEEP_TIME_MS);
}
} else {
pollAndCollectData(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
@Slf4j
public class DynamicChunkSplitter extends ChunkSplitter {

private static final long SLEEP_TIME_MS = 100L;

public DynamicChunkSplitter(JdbcSourceConfig config) {
super(config);
}
Expand Down Expand Up @@ -439,7 +441,7 @@ private static void maySleep(int count, TablePath tablePath) {
// every 100 queries to sleep 1s
if (count % 10 == 0) {
try {
Thread.sleep(100);
Thread.sleep(SLEEP_TIME_MS);
} catch (InterruptedException e) {
// nothing to do
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class JdbcSourceReader implements SourceReader<SeaTunnelRow, JdbcSourceSp
private final JdbcInputFormat inputFormat;
private final Deque<JdbcSourceSplit> splits = new ConcurrentLinkedDeque<>();
private volatile boolean noMoreSplit;
private static final long SLEEP_TIME_MS = 1000L;

public JdbcSourceReader(
Context context, JdbcSourceConfig config, Map<TablePath, CatalogTable> tables) {
Expand All @@ -58,7 +59,6 @@ public void close() throws IOException {
}

@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
synchronized (output.getCheckpointLock()) {
JdbcSourceSplit split = splits.poll();
Expand All @@ -77,7 +77,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
log.info("Closed the bounded jdbc source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
Thread.sleep(SLEEP_TIME_MS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class KuduSourceReader implements SourceReader<SeaTunnelRow, KuduSourceSp
boolean noMoreSplit;

private final Map<TablePath, SeaTunnelRowType> tables;
private static final long SLEEP_TIME_MS = 1000L;

public KuduSourceReader(Context context, KuduSourceConfig kuduSourceConfig) {
this.context = context;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
log.info("Closed the bounded kudu source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
Thread.sleep(SLEEP_TIME_MS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour
private final Table table;
private final SeaTunnelRowType seaTunnelRowType;
private volatile boolean noMoreSplit;
private static final long SLEEP_TIME_MS = 1000L;

public PaimonSourceReader(Context context, Table table, SeaTunnelRowType seaTunnelRowType) {
this.context = context;
Expand Down Expand Up @@ -86,7 +87,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
context.signalNoMoreElement();
} else {
log.warn("Waiting for flink table source split, sleeping 1s");
Thread.sleep(1000L);
Thread.sleep(SLEEP_TIME_MS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class InMemorySinkWriter

// use a daemon thread to test classloader leak
private static final Thread THREAD;
private static final long SLEEP_TIME_MS = 1000L;

static {
// use the daemon thread to always hold the classloader
Expand All @@ -47,7 +48,7 @@ public class InMemorySinkWriter
() -> {
while (true) {
try {
Thread.sleep(1000);
Thread.sleep(SLEEP_TIME_MS);
System.out.println(classLoader);
} catch (InterruptedException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public class CheckpointCoordinator {
private PendingCheckpoint savepointPendingCheckpoint;

private final String checkpointStateImapKey;
private static final long SLEEP_TIME_MS = 500L;

@SneakyThrows
public CheckpointCoordinator(
Expand Down Expand Up @@ -490,7 +491,7 @@ public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
CompletableFuture<PendingCheckpoint> savepoint;
synchronized (lock) {
while (pendingCounter.get() > 0 && !shutdown) {
Thread.sleep(500);
Thread.sleep(SLEEP_TIME_MS);
}
if (shutdown || isCompleted()) {
return completableFutureWithError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class PhysicalVertex {

public volatile boolean isRunning = false;

private static final long SLEEP_TIME_MS = 2000L;

/** The error throw by physicalVertex, should be set when physicalVertex throw error. */
private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();

Expand Down Expand Up @@ -441,7 +443,7 @@ private void noticeTaskExecutionServiceCancel() {
"%s cancel failed with Exception: %s, retry %s",
this.getTaskFullName(), ExceptionUtils.getMessage(e), i));
try {
Thread.sleep(2000);
Thread.sleep(SLEEP_TIME_MS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public abstract class SeaTunnelTask extends AbstractTask {

private SeaTunnelMetricsContext metricsContext;

private static final long SLEEP_TIME_MS = 100L;

public SeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow executionFlow) {
super(jobID, taskID);
this.indexID = indexID;
Expand Down Expand Up @@ -151,14 +153,14 @@ protected void stateProcess() throws Exception {
currState = READY_START;
reportTaskStatus(READY_START);
} else {
Thread.sleep(100);
Thread.sleep(SLEEP_TIME_MS);
}
break;
case READY_START:
if (startCalled) {
currState = STARTING;
} else {
Thread.sleep(100);
Thread.sleep(SLEEP_TIME_MS);
}
break;
case STARTING:
Expand All @@ -174,7 +176,7 @@ protected void stateProcess() throws Exception {
if (closeCalled) {
currState = CLOSED;
} else {
Thread.sleep(100);
Thread.sleep(SLEEP_TIME_MS);
}
break;
case CLOSED:
Expand Down