Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strange behavior of the streams.publish procedure inside apoc.periodic.repeat procedure #486

Open
doctor3030 opened this issue Aug 21, 2021 · 3 comments

Comments

@doctor3030
Copy link

doctor3030 commented Aug 21, 2021

Please note that GitHub issues are only meant for bug reports/feature requests. If you have questions on how to use Streams, please ask on StackOverflow instead of creating an issue here.

Assume a background process:
CALL apoc.periodic.repeat("process_name",
"MATCH (n:Label)
WITH n.prp1 AS val1, n.prop2 AS val2
CALL streams.publish("kafka_topic", {val1: val1, val2: val2})
RETURN null", 20)

When created, this process does not send anything to kafka.
When the streams.publish is replaced with a dummy statement like counter = counter + 1, it works.
When the streams.publish is called manualy outside the apoc.periodic.repeat it also works (messages are published).

I was able to solve this issue by calling streams.publish from a dummy apoc.do.when procedure like this:
CALL apoc.periodic.repeat("process_name",
"MATCH (n:Label)
WITH n.prp1 AS val1, n.prop2 AS val2
CALL apoc.do.when(true,
'CALL streams.publish(\"kafka_topic\", {val1: val1, val2: val2}) RETURN null',
'RETURN null', {val1: val1, val2: val2}) YIELD value
RETURN null", 20)

Maybe I missing something but this behavior is not documented anywhere and I was able to find a solution only because I already had a similar periodic.repeat process that already had a streams.publish inside apoc.do.when and worked perfectly.

Versions

  • OS: ubuntu 20.04
  • Neo4j: desktop enterprice 4.2.5
  • Neo4j-APOC: 4.2.0.5
  • Neo4j-Streams: 4.0.8
@mroiter-larus
Copy link
Contributor

Hi @doctor3030,

sorry for the late reply. Have you already solved your issue?
If not, could you please share your neo4j.conf file, and in particular the Streams related part?

Regards,

Mauro

@doctor3030
Copy link
Author

doctor3030 commented Nov 18, 2021

Well Im still using dummy apoc.do.when as I described above.

Here is my neo4j.conf
neo4j.conf.txt

Streams.conf
kafka.bootstrap.servers=10.0.0.120:19092,10.0.0.224:19092,10.0.0.114:19092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.message.max.bytes=1048576000
kafka.enable.auto.commit=false
kafka.streams.async.commit=false
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.session.timeout.ms=25000

streams.source.enabled=true
streams.sink.enabled=true
streams.sink.enabled.to.rehoboam=true
streams.source.enabled.from.rehoboam=true
streams.procedures.enabled.from.rehoboam=true
streams.source.schema.polling.interval=300000

Thanks,
Dmitry

@mroiter-larus
Copy link
Contributor

Hi @doctor3030,

the issue is caused by how apoc works. I mean apoc.periodic.repeate open a new transaction and this is why it doesn't works together with Streams. If you need to use it, the workaround you already found is currently the only solution.

Regards,

Mauro

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants