-
Notifications
You must be signed in to change notification settings - Fork 796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug: Deadlock encountered #3987
Comments
Heya @triracle97 thanks for raising this. The live queries are going through a significant design change that will reduce the chance of this deadlock happening significantly. We are currently seeing another perf issue with 1.4.x in #3906 - it may be loosely related or at least correlated. Will have a look once 3906 is resolved. |
Thanks for replying @phughk! |
Unfortunately, that isn't possible - the lock is required to check which connection to send live query notifications to. |
Update for this, since updated to 1.5.0, I haven't encountered this and can't reproduce it with above method |
I encountered it again, the issue still persists, just harder to reproduce |
Thanks for the update and info - we will try to tackle this in. The solution would be to have channels with messages and a single handler. It's a bit of work and we are very focused on cloud and 2.0 at the moment. But this is on our radar. Hopefully retrying can be a way around this |
My workmate just updated the surreal core like this, and it seems to help prevent the deadlock, I'm not familiar with rust so I'm not sure whether if it has any side effect or not, but I think it's good to share with u guys. I built wit tag v1.4.1. diff --git a/src/net/rpc.rs b/src/net/rpc.rs
index ddf8f86f..972543ad 100644
--- a/src/net/rpc.rs
+++ b/src/net/rpc.rs
@@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use std::ops::Deref;
+use std::time::Duration;
use crate::cnf;
use crate::dbs::DB;
@@ -20,6 +21,8 @@ use axum::{
use bytes::Bytes;
use http::HeaderValue;
use http_body::Body as HttpBody;
+use once_cell::sync::Lazy;
+use tokio::sync::RwLock;
use surrealdb::dbs::Session;
use surrealdb::rpc::format::Format;
use surrealdb::rpc::format::PROTOCOLS;
@@ -65,7 +68,7 @@ async fn get_handler(
},
};
// Check if a connection with this id already exists
- if WEBSOCKETS.read().await.contains_key(&id) {
+ if try_read(&WEBSOCKETS).await.unwrap().contains_key(&id) {
return Err(Error::Request);
}
// Now let's upgrade the WebSocket connection
@@ -80,6 +83,20 @@ async fn get_handler(
.on_upgrade(move |socket| handle_socket(socket, sess, id)))
}
+async fn try_read<'a, T>(
+ data: &'a Lazy<RwLock<T>>
+) -> Result<tokio::sync::RwLockReadGuard<'a, T>, ()> {
+ loop {
+ match data.try_read() {
+ Ok(guard) => return Ok(guard),
+ Err(_) => {
+ // Lock not available, retry after a short delay
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+ }
+ }
+}
+
async fn handle_socket(ws: WebSocket, sess: Session, id: Uuid) {
// Check if there is a WebSocket protocol specified
let format = match ws.protocol().map(HeaderValue::to_str) {
@@ -122,3 +139,4 @@ async fn post_handler(
Err(err) => Err(Error::from(err)),
}
}
+
diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs
index adac30e0..6832d37b 100644
--- a/src/rpc/mod.rs
+++ b/src/rpc/mod.rs
@@ -1,4 +1,3 @@
-pub mod args;
pub mod connection;
pub mod failure;
pub mod format;
@@ -13,10 +12,12 @@ use once_cell::sync::Lazy;
use opentelemetry::Context as TelemetryContext;
use std::collections::HashMap;
use std::sync::Arc;
-use std::time::Duration;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
+use tokio::time::{timeout, Duration};
+use std::fmt::Debug;
+use tokio::io::AsyncReadExt;
static CONN_CLOSED_ERR: &str = "Connection closed normally";
/// A type alias for an RPC Connection
@@ -45,9 +46,9 @@ pub(crate) async fn notifications(canceller: CancellationToken) {
// Receive a notification on the channel
Ok(notification) = channel.recv() => {
// Find which WebSocket the notification belongs to
- if let Some(id) = LIVE_QUERIES.read().await.get(¬ification.id) {
+ if let Some(id) = try_read(&LIVE_QUERIES).await.unwrap().get(¬ification.id) {
// Check to see if the WebSocket exists
- if let Some(rpc) = WEBSOCKETS.read().await.get(id) {
+ if let Some(rpc) = try_read(&WEBSOCKETS).await.unwrap().get(id) {
// Serialize the message to send
let message = success(None, notification);
// Add metrics
@@ -56,9 +57,11 @@ pub(crate) async fn notifications(canceller: CancellationToken) {
.with_live_id(id.to_string());
let cx = Arc::new(cx.with_value(not_ctx));
// Get the WebSocket output format
- let format = rpc.read().await.format;
+ let format = try_read2(&rpc).await.unwrap().format;
+ // let format = rpc.read().await.format;
// get the WebSocket sending channel
- let sender = rpc.read().await.channels.0.clone();
+ let sender = try_read2(&rpc).await.unwrap().channels.0.clone();
+ // let sender = rpc.read().await.channels.0.clone();
// Send the notification to the client
message.send(cx, format, &sender).await
}
@@ -72,11 +75,14 @@ pub(crate) async fn notifications(canceller: CancellationToken) {
/// Closes all WebSocket connections, waiting for graceful shutdown
pub(crate) async fn graceful_shutdown() {
// Close WebSocket connections, ensuring queued messages are processed
- for (_, rpc) in WEBSOCKETS.read().await.iter() {
- rpc.read().await.canceller.cancel();
+ // for (_, rpc) in WEBSOCKETS.read().await.iter() {
+ // rpc.read().await.canceller.cancel();
+ for (_, rpc) in try_read(&WEBSOCKETS).await.unwrap().iter() {
+ try_read2(&rpc).await.unwrap().canceller.cancel();
}
// Wait for all existing WebSocket connections to finish sending
- while WEBSOCKETS.read().await.len() > 0 {
+ while try_read(&WEBSOCKETS).await.unwrap().len() > 0 {
+ // while WEBSOCKETS.read().await.len() > 0 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
@@ -88,3 +94,32 @@ pub(crate) fn shutdown() {
writer.drain();
}
}
+
+async fn try_read<'a, T>(
+ data: &'a Lazy<RwLock<T>>
+) -> Result<tokio::sync::RwLockReadGuard<'a, T>, ()> {
+ loop {
+ match data.try_read() {
+ Ok(guard) => return Ok(guard),
+ Err(_) => {
+ // Lock not available, retry after a short delay
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+ }
+ }
+}
+
+async fn try_read2<'a, T>(
+ data: &'a Arc<RwLock<T>>
+) -> Result<tokio::sync::RwLockReadGuard<'a, T>, ()> {
+ loop {
+ match data.try_read() {
+ Ok(guard) => return Ok(guard),
+ Err(_) => {
+ // Lock not available, retry after a short delay
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+ }
+ }
+}
+ |
Describe the bug
When I ran surrealdb intensively with live query, the database hang at this point in file
src/net/rpc.rs
I used js sdk + reactjs and I tried both surreal docker and self built surreal in an ubuntu server. When I refreshed the page, the connection wasn't cleared.
Steps to reproduce
I can consistently reproduce the error with this scenario
start surreal db with this command
Then create a simple react project with
npx create-react-app my-app
Copy the following snippet to App.js
Open 4 tabs with
http://localhost:3000
, refresh the all tabs several times and then wait around 5 mins, then try to refresh any tab multiple times again, the websocket will be in pending state (the server will hang at above code)Expected behaviour
I guess this is a deadlock, and it shouldn't happen
SurrealDB version
v1.4.2, built in x86_64-unknown-linux-gnu
Contact Details
No response
Is there an existing issue for this?
Code of Conduct
The text was updated successfully, but these errors were encountered: