Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the field localCorrelationData to MqttPublish for better flow-handling in reactive-APIs #546

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

codepitbull
Copy link

Motivation
Using the reactive integrations (rxjava2/reactor) results in a situation where after an MqttPublish has been processed a Mqtt5PublishResult is forward in the stream. This object doesn't contain any processing context which makes a few usecases rather unintuitive to implement.
In my case I am receiving from Kafka and want to commit AFTER the message has been sent to HiveMQ. For that I need to get the commit-offset to the end of the stream.

Changes
To support the above mentioned usecase I added a localCorrelationData (similar to what Kafka is doing in their client-lib). This field is never propagated to HiveMQ but can be used down the streams. In my case I store the Kafka-Commit-Offset in there and use it to trigger the actual commit.

@cla-bot
Copy link

cla-bot bot commented Oct 11, 2022

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have the users @codepitbull on file. In order for us to review and merge your code, please sign our Contributor License Agreement to get yourself added. You'll find the CLA and more information here: https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc#contributor-license-agreement

Comment on lines 80 to 84
/**
* @return the optional local correlation data of this Publish message. This data is never propagated and kept
* locally for correlation.
*/
Object localCorrelationData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API needs to be discussed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the usage of object (yep, could replace it with a generic but that would change general signature of the class) or something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like a generic here. But maybe there would be a case for creating a more sophisticated structure like for user properties. Having just plain Object there is not very expandable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannickWeber any objections to merging this and then punting the improvement (that may or may not be needed) for a future date? Object isn't "slick" but it serves the purpose for now without complexity. Plus this isn't a customer requested feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have objections. This is API, API can only change in major versions, therefore we need to be very cautious what we are adding and need to double check for expandability and clarity to make it future-proof.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to double check for expandability and clarity to make it future-proof

One addition, I agree on this point completely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with moving cautiously, just to provide some context:
The newly added API is strictly for local correlation when inside a reactive pipeline.
I used Object because it was the least invasive and most flexible in that case.
With some fighting we might turn this into a generic.
We could also generate some id for these local interactions which I think would be a little much since this must never be transferred over the wire.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we can add API in minor versions, but we can only remove methods in major versions. Therefore, I want us to be very careful about API design, as we can't easily correct potential API issues.

In the client world, things aren't as strict since package updates are manual.

I would argue that API stability and caution is of high importance independent of where it is used.

My suggestion would be to not return an Object but rather add an Interface LocalCorrelationData. So that we can expand the return value easier in the future. But that also does not mitigate the problem that you always have to unsafely cast when using the correlation data object :(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An unsafe cast can be avoided with an instanceof, which got a lot nicer with recent Java-versions.

From a type-perspective it would be best to introduce a generic, like Ractor-Kafka is doing it (https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/sender/SenderRecord.java#L30).

Everything without a generic will require an unsafe cast at some point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A final decision would be ideal. Let me know if there are any other thoughts.

Comment on lines 112 to 116
/**
* @return the optional local correlation data of this Publish message. This data is never propagated and kept
* locally for correlation.
*/
Object getLocalCorrelationData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API needs to be discussed.

@pglombardo pglombardo self-requested a review as a code owner February 15, 2023 13:21
@cla-bot
Copy link

cla-bot bot commented Feb 15, 2023

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have the users @codepitbull on file. In order for us to review and merge your code, please sign our Contributor License Agreement to get yourself added. You'll find the CLA and more information here: https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc#contributor-license-agreement

@cla-bot
Copy link

cla-bot bot commented Feb 15, 2023

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have the users @codepitbull on file. In order for us to review and merge your code, please sign our Contributor License Agreement to get yourself added. You'll find the CLA and more information here: https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc#contributor-license-agreement

@pglombardo
Copy link
Contributor

pglombardo commented Feb 15, 2023

@cla-bot u so lazy

@pglombardo
Copy link
Contributor

@cla-bot check

@cla-bot cla-bot bot added the cla-signed label Feb 15, 2023
@cla-bot
Copy link

cla-bot bot commented Feb 15, 2023

The cla-bot has been summoned, and re-checked this pull request!

@pglombardo
Copy link
Contributor

Hi all - let's wrap up this PR this week if possible so it doesn't go stale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants