Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Mono.when not respond #226

Open
kevin70 opened this issue Jan 23, 2022 · 1 comment
Open

Mono.when not respond #226

kevin70 opened this issue Jan 23, 2022 · 1 comment

Comments

@kevin70
Copy link
Contributor

kevin70 commented Jan 23, 2022

versions:

java 11
r2dbc-spi: 0.9.0-RELEASE
r2dbc-mysql: 0.9.0-SNAPSHOT

code:

Mono<Void> f1(Connection connection) {
  return Flux.from(connection.createStatement("this is bad sql").execute())
      .flatMap(Result::getRowsUpdated)
      .then();
}

Mono<Void> f2(Connection connection) {
  return Flux.from(connection.createStatement("this is bad sql").execute())
      .flatMap(Result::getRowsUpdated)
      .then();
}

Mono<Void> execWhen(Connection connection) {
  return Mono.when(f1(connection), f2(connection));
}

@Test
void testFailed() {
  var connectionFactory = super.rc.getConnectionFactory();
  Flux.usingWhen(
          connectionFactory.create(),
          connection -> {
            // transaction
            return Flux.from(connection.beginTransaction())
                .thenMany(execWhen(connection))
                .concatWith(connection.commitTransaction())
                .onErrorResume(
                    t -> Mono.from(connection.rollbackTransaction()).then(Mono.error(t)));
          },
          Connection::close)
      .as(StepVerifier::create)
      .expectError(R2dbcBadGrammarException.class)
      .verify(Duration.ofSeconds(10));
}

results:

AssertionError: VerifySubscriber timed out on reactor.core.publisher.FluxUsingWhen$ResourceSubscriber

expect:

R2dbcBadGrammarException

When I use Mono.when when an exception occurs when connecting multiple publishers, the upstream will not return the correct result.

@kevin70
Copy link
Contributor Author

kevin70 commented Jan 23, 2022

Mono<Void> f1(Connection connection) {
  return Flux.from(connection.createStatement("this is bad sql").execute())
      .flatMap(Result::getRowsUpdated)
      .then();
}
Mono<Void> f2(Connection connection) {
  return Flux.from(connection.createStatement("this is bad sql").execute())
      .flatMap(Result::getRowsUpdated)
      .then();
}
Mono<Void> execThen(Connection connection) {
  return f1(connection).then(f2(connection));
}
@Test
void testSuccess() {
  var connectionFactory = super.rc.getConnectionFactory();
  Flux.usingWhen(
          connectionFactory.create(),
          connection -> {
            // transaction
            return Flux.from(connection.beginTransaction())
                .thenMany(execThen(connection))
                .concatWith(connection.commitTransaction())
                .onErrorResume(
                    t -> Mono.from(connection.rollbackTransaction()).then(Mono.error(t)));
          },
          Connection::close)
      .as(StepVerifier::create)
      .expectError(R2dbcBadGrammarException.class)
      .verify(Duration.ofSeconds(10));
}

I modify the code to get the correct result in the above case.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant