Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Deadlock encountered #3987

Open
2 tasks done
triracle97 opened this issue May 6, 2024 · 7 comments
Open
2 tasks done

Bug: Deadlock encountered #3987

triracle97 opened this issue May 6, 2024 · 7 comments
Assignees
Labels
bug Something isn't working topic:live This is related to live queries and push notifications
Milestone

Comments

@triracle97
Copy link

triracle97 commented May 6, 2024

Describe the bug

When I ran surrealdb intensively with live query, the database hang at this point in file src/net/rpc.rs

if WEBSOCKETS.read().await.contains_key(&id) {
   return Err(Error::Request);
}

I used js sdk + reactjs and I tried both surreal docker and self built surreal in an ubuntu server. When I refreshed the page, the connection wasn't cleared.

Steps to reproduce

I can consistently reproduce the error with this scenario

start surreal db with this command

./surreal start --log trace -u test -p test file://./testdb

Then create a simple react project with npx create-react-app my-app

Copy the following snippet to App.js

import logo from './logo.svg';
import './App.css';
import { Surreal } from "surrealdb.js";
import { useEffect } from "react";

const db = new Surreal({
  onError: (err) => {
    console.log('on error', err);
  },
  onClose: (err) => {
    console.log('on close', err);
  },
  onConnect: () => {
    console.log('connected')
  }
});

async function runSingleDB(tableName) {
  // await db.query(`DEFINE TABLE ${tableName}`); <-- uncomment this on first run to create the table
  const res = await db.live(tableName, () => {})
  console.log('Live query id', tableName, res);
  for (let i = 0; i < 1000; i++) {
    await db.update(`${tableName}:${i}`, {
      test: 1,
      haha: Math.random(),
    })
  }
}

async function initDb() {
  try {
    await db.connect('http://<Your server>:8000/rpc', {
      auth: {
        username: "test",
        password: "test"
      },
      namespace: 'test',
      database: 'test'
    })
  } catch (err) {
    console.log('Hi, err', err);
  }
  await runSingleDB('db1')
  await runSingleDB('db2')
  await runSingleDB('db3')
  await runSingleDB('db4')
  await runSingleDB('db5')
}

function App() {
  console.log('App')
  useEffect(() => {
    initDb()
  }, []);

  return (
    <div className="App">
      <header className="App-header">
        <img src={logo} className="App-logo" alt="logo" />
        <p>
          Edit <code>src/App.js</code> and save to reload.
        </p>
        <a
          className="App-link"
          href="https://reactjs.org"
          target="_blank"
          rel="noopener noreferrer"
        >
          Learn React
        </a>
      </header>
    </div>
  );
}

export default App;

Open 4 tabs with http://localhost:3000, refresh the all tabs several times and then wait around 5 mins, then try to refresh any tab multiple times again, the websocket will be in pending state (the server will hang at above code)

Expected behaviour

I guess this is a deadlock, and it shouldn't happen

SurrealDB version

v1.4.2, built in x86_64-unknown-linux-gnu

Contact Details

No response

Is there an existing issue for this?

  • I have searched the existing issues

Code of Conduct

  • I agree to follow this project's Code of Conduct
@triracle97 triracle97 added bug Something isn't working triage This issue is new labels May 6, 2024
@phughk phughk self-assigned this May 6, 2024
@phughk phughk added topic:live This is related to live queries and push notifications and removed triage This issue is new labels May 6, 2024
@phughk phughk added this to the v2.0.0 milestone May 6, 2024
@phughk
Copy link
Contributor

phughk commented May 6, 2024

Heya @triracle97 thanks for raising this. The live queries are going through a significant design change that will reduce the chance of this deadlock happening significantly. We are currently seeing another perf issue with 1.4.x in #3906 - it may be loosely related or at least correlated. Will have a look once 3906 is resolved.

@triracle97
Copy link
Author

Thanks for replying @phughk!
Is there any side effect if I comment that line to prevent deadlock temporarily, I don't need to reuse live query and I can kill and recreate live query everytime I create new connection

@phughk
Copy link
Contributor

phughk commented May 13, 2024

Unfortunately, that isn't possible - the lock is required to check which connection to send live query notifications to.

@triracle97
Copy link
Author

Update for this, since updated to 1.5.0, I haven't encountered this and can't reproduce it with above method

@triracle97
Copy link
Author

I encountered it again, the issue still persists, just harder to reproduce

@phughk
Copy link
Contributor

phughk commented May 21, 2024

Thanks for the update and info - we will try to tackle this in. The solution would be to have channels with messages and a single handler. It's a bit of work and we are very focused on cloud and 2.0 at the moment. But this is on our radar. Hopefully retrying can be a way around this

@triracle97
Copy link
Author

My workmate just updated the surreal core like this, and it seems to help prevent the deadlock, I'm not familiar with rust so I'm not sure whether if it has any side effect or not, but I think it's good to share with u guys.

I built wit tag v1.4.1.

diff --git a/src/net/rpc.rs b/src/net/rpc.rs
index ddf8f86f..972543ad 100644
--- a/src/net/rpc.rs
+++ b/src/net/rpc.rs
@@ -1,5 +1,6 @@
 use std::collections::BTreeMap;
 use std::ops::Deref;
+use std::time::Duration;

 use crate::cnf;
 use crate::dbs::DB;
@@ -20,6 +21,8 @@ use axum::{
 use bytes::Bytes;
 use http::HeaderValue;
 use http_body::Body as HttpBody;
+use once_cell::sync::Lazy;
+use tokio::sync::RwLock;
 use surrealdb::dbs::Session;
 use surrealdb::rpc::format::Format;
 use surrealdb::rpc::format::PROTOCOLS;
@@ -65,7 +68,7 @@ async fn get_handler(
 		},
 	};
 	// Check if a connection with this id already exists
-	if WEBSOCKETS.read().await.contains_key(&id) {
+	if try_read(&WEBSOCKETS).await.unwrap().contains_key(&id) {
 		return Err(Error::Request);
 	}
 	// Now let's upgrade the WebSocket connection
@@ -80,6 +83,20 @@ async fn get_handler(
 		.on_upgrade(move |socket| handle_socket(socket, sess, id)))
 }

+async fn try_read<'a, T>(
+	data: &'a Lazy<RwLock<T>>
+) -> Result<tokio::sync::RwLockReadGuard<'a, T>, ()> {
+	loop {
+		match data.try_read() {
+			Ok(guard) => return Ok(guard),
+			Err(_) => {
+				// Lock not available, retry after a short delay
+				tokio::time::sleep(Duration::from_millis(1)).await;
+			}
+		}
+	}
+}
+
 async fn handle_socket(ws: WebSocket, sess: Session, id: Uuid) {
 	// Check if there is a WebSocket protocol specified
 	let format = match ws.protocol().map(HeaderValue::to_str) {
@@ -122,3 +139,4 @@ async fn post_handler(
 		Err(err) => Err(Error::from(err)),
 	}
 }
+
diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs
index adac30e0..6832d37b 100644
--- a/src/rpc/mod.rs
+++ b/src/rpc/mod.rs
@@ -1,4 +1,3 @@
-pub mod args;
 pub mod connection;
 pub mod failure;
 pub mod format;
@@ -13,10 +12,12 @@ use once_cell::sync::Lazy;
 use opentelemetry::Context as TelemetryContext;
 use std::collections::HashMap;
 use std::sync::Arc;
-use std::time::Duration;
 use tokio::sync::RwLock;
 use tokio_util::sync::CancellationToken;
 use uuid::Uuid;
+use tokio::time::{timeout, Duration};
+use std::fmt::Debug;
+use tokio::io::AsyncReadExt;

 static CONN_CLOSED_ERR: &str = "Connection closed normally";
 /// A type alias for an RPC Connection
@@ -45,9 +46,9 @@ pub(crate) async fn notifications(canceller: CancellationToken) {
 				// Receive a notification on the channel
 				Ok(notification) = channel.recv() => {
 					// Find which WebSocket the notification belongs to
-					if let Some(id) = LIVE_QUERIES.read().await.get(&notification.id) {
+					if let Some(id) = try_read(&LIVE_QUERIES).await.unwrap().get(&notification.id) {
 						// Check to see if the WebSocket exists
-						if let Some(rpc) = WEBSOCKETS.read().await.get(id) {
+						if let Some(rpc) = try_read(&WEBSOCKETS).await.unwrap().get(id) {
 							// Serialize the message to send
 							let message = success(None, notification);
 							// Add metrics
@@ -56,9 +57,11 @@ pub(crate) async fn notifications(canceller: CancellationToken) {
 								  .with_live_id(id.to_string());
 							let cx = Arc::new(cx.with_value(not_ctx));
 							// Get the WebSocket output format
-							let format = rpc.read().await.format;
+							let format = try_read2(&rpc).await.unwrap().format;
+							// let format = rpc.read().await.format;
 							// get the WebSocket sending channel
-							let sender = rpc.read().await.channels.0.clone();
+							let sender =  try_read2(&rpc).await.unwrap().channels.0.clone();
+							// let sender = rpc.read().await.channels.0.clone();
 							// Send the notification to the client
 							message.send(cx, format, &sender).await
 						}
@@ -72,11 +75,14 @@ pub(crate) async fn notifications(canceller: CancellationToken) {
 /// Closes all WebSocket connections, waiting for graceful shutdown
 pub(crate) async fn graceful_shutdown() {
 	// Close WebSocket connections, ensuring queued messages are processed
-	for (_, rpc) in WEBSOCKETS.read().await.iter() {
-		rpc.read().await.canceller.cancel();
+	// for (_, rpc) in WEBSOCKETS.read().await.iter() {
+	// 	rpc.read().await.canceller.cancel();
+	for (_, rpc) in try_read(&WEBSOCKETS).await.unwrap().iter() {
+		try_read2(&rpc).await.unwrap().canceller.cancel();
 	}
 	// Wait for all existing WebSocket connections to finish sending
-	while WEBSOCKETS.read().await.len() > 0 {
+	while try_read(&WEBSOCKETS).await.unwrap().len() > 0 {
+	// while WEBSOCKETS.read().await.len() > 0 {
 		tokio::time::sleep(Duration::from_millis(100)).await;
 	}
 }
@@ -88,3 +94,32 @@ pub(crate) fn shutdown() {
 		writer.drain();
 	}
 }
+
+async fn try_read<'a, T>(
+	data: &'a Lazy<RwLock<T>>
+) -> Result<tokio::sync::RwLockReadGuard<'a, T>, ()> {
+	loop {
+		match data.try_read() {
+			Ok(guard) => return Ok(guard),
+			Err(_) => {
+				// Lock not available, retry after a short delay
+				tokio::time::sleep(Duration::from_millis(1)).await;
+			}
+		}
+	}
+}
+
+async fn try_read2<'a, T>(
+	data: &'a Arc<RwLock<T>>
+) -> Result<tokio::sync::RwLockReadGuard<'a, T>, ()> {
+	loop {
+		match data.try_read() {
+			Ok(guard) => return Ok(guard),
+			Err(_) => {
+				// Lock not available, retry after a short delay
+				tokio::time::sleep(Duration::from_millis(1)).await;
+			}
+		}
+	}
+}
+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working topic:live This is related to live queries and push notifications
Projects
None yet
Development

No branches or pull requests

2 participants