-
-
Notifications
You must be signed in to change notification settings - Fork 857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
federation: parallel sending per instance #4623
base: main
Are you sure you want to change the base?
Conversation
crates/federate/src/worker.rs
Outdated
let domain = self.instance.domain.clone(); | ||
tokio::spawn(async move { | ||
let mut report = report; | ||
if let Err(e) = InstanceWorker::send_retry_loop( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Store this in a variable first to make it more readable.
crates/federate/src/worker.rs
Outdated
activity: &SentActivity, | ||
object: &SharedInboxActivities, | ||
inbox_urls: Vec<Url>, | ||
report: &mut UnboundedSender<SendActivityResult>, | ||
initial_fail_count: i32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would make more sense to use a single shared AtomicI32 for fail count, instead of passing it back and forth like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not passed back and forth, it's only passed one way, from the worker to the send task. the issue with not passing it in explicitly would be that then the send_retry_loop function would access self
, and thus the whole InstanceWorker struct would have to be Send+Sync, which means wrapping everything in locks.
That's why i create those local variables before the lambda calling this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you can share around an Arc<AtomicI32>
instead of self
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point is we don't need any atomics, so adding atomics without any real reason is bad. this is a read-only immutable variable that is passed only one way, from the main thread to each send task so it knows if the request fails how much to sleep initially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just realizing you're referring to the channel send when you are talking about the "passing it back".
I see that might be an option. But I don't like it. The send task is cleanly separated and has no direct interaction with the main thread. It's nicely compartmentalized and you can now purely look at the send.rs file to understand how the send-retry-loop of one activity works. It purely sends notifications upwards, it never receives updates downwards after it is started.
The failure event needs to be transferred to the main task in any case so it can update the database. The main fail_count should only be updated under certain conditions and these conditions belong in the main task, since the logic that updates the database needs the same information.
In addition, if we changed this to an atomic for the fails then we should also change the "success" sends to direct writes into a RwLock instead of writing to a channel. Otherwise there's a weird mix where some data is sent via channel and some via atomics. But changing that would mean you would again have to figure out how to signal the main task when to continue and again make all interactions more complex to understand since they can happen with arbitrary concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, it seems a bit weird at first but makes sense with your explanation. Would be good if you can copy that into a code comment on SendActivityResult
.
// which can only happen by an event sent into the channel | ||
self | ||
.handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would launch a separate background task for this instead of manually checking here. Can also potentially be moved into a separate struct/file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, that's what I originally wanted to do, but the problem then is that you need some way for the "read activities loop" to pause and unpause depending on the state of the results handling tasks. That would require either another arbitrary sleep() or another channel for sending around values.
I intentionally put that inline in here because then the "read activities" loop knows exactly when to continue and nothing need any thread synchronization. Nothing in this struct needs atomics or locks because there's only a single task accessing anything, the only thing running in parallel is the actual sends.
I agree that passing muts to local variables is pretty... weird, I originally just had this part inlined, i moved it out so it's easier to grok the whole loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recv_many blocks automatically if there are no items. Then you can also sleep for WORK_FINISHED_RECHECK_DELAY
if there are less than 4 items or so. Anyway UnboundedReceiver works across threads so I dont see why it would need any locks.
You should also be able to handle print_stats()
from the same receiver, I dont see any reason why it should need a separate channel. Additionally it should be possible to use a single receiver worker to write data for all instances, no need to do it separately per instance.
Some of this may be too much effort to be worth implementing now, but at least mention the possibilities in a comment for future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recv_many blocks automatically if there are no items
Yes but the problem is that you're saying you want to have the recv be in a separate thread from the thread that queues new sends. So then how would the sending-queuer know when it's supposed to send new items? It's inline in here exactly because it blocks if there's no items and thus pauses the send loop for the exactly correct time until a new item should be sent. The alternatives are hacky and would require more arbitrarily chosen delays. And this event is expected to happen 10+ times per second per instance so waiting would basically be a busy loop (sleep 10ms, recheck, sleep 10ms, recheck, ...)
I don't really see how this would be a possibility, it just seems all around worse to me to move this to a separate thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. What about using only a single unbounded_channel
for activity send completion and print_stats()
?
936469a
to
5e986ef
Compare
I've split the instance worker into three separate files:
|
@@ -0,0 +1,149 @@ | |||
use crate::util::LEMMY_TEST_FAST_FEDERATION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a note, i've not made any changes to this code, just moved it into a separate struct
As mentioned in the dev chat it would be very useful to have some unit tests here, to ensure it works as expected. |
Any ideas on how best to do that? The only clean way I can think of would be abstracting all DB and HTTP interactions (I guess it would be like 5-10 functions?) into a trait so the whole federate crate code is pure, and then mocking the DB interactions and HTTP interactions with data from memory. |
Mocking would be too complicated and could introduce problems of its own. Look at how other tests are implemented, eg for db views. Basically write some test data to the db, then call functions and see if they behave as expected. You can start a local server with inbox route to check that activities are received (with an instance like |
Co-authored-by: dullbananas <dull.bananas0@gmail.com>
c1397cd
to
175133f
Compare
I realized that there is a better criteria for parallel sending, rather than grouping by community id or post id. Instead we can send activities in parallel as long as they have different actors (ie different So the algorithm would be something like this:
|
So after updating to main branch and a few smaller changes, the federation tests now actually pass (seemingly reliably), with the default configuration of concurrent_sends_per_instance=1. I've marked this PR as ready for review. I've in the latest commits changed the CI to actually run the federation tests twice (with a few necessary changes to make that work). The reason is that it is pretty tedious to always have to fully reset the databases in order to run the federation tests, so I think it makes sense to make them not dependent on running on fully empty databases. |
let user = await registerUser( | ||
alpha, | ||
alphaUrl, | ||
"تجريب" + Math.random().toString().slice(2, 5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the milliseconds since the epoch to make it simpler and more likely to produce a unique username:
"تجريب" + Math.random().toString().slice(2, 5), | |
"تجريب" + Date.now(), |
@@ -214,6 +229,7 @@ mod test { | |||
.app_data(context.clone()) | |||
.build() | |||
.await?; | |||
let federation_worker_config = FederationWorkerConfig::default(); // TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What
// handle_send_results does not guarantee that we are now in a condition where we want to | ||
// send a new one, so repeat this check until the if no longer applies | ||
continue; | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This else
is redundant because of continue
let last_successful_id = self.state.last_successful_id.map(|e| e.0).context( | ||
"impossible: id is initialized in get_latest_ids and never returned to None", | ||
)?; | ||
let expected_next_id = last_successful_id + (successfuls.len() as i64) + in_flight + 1; | ||
// compare to next id based on incrementing | ||
if expected_next_id != next_id_to_send.0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let last_successful_id = self.state.last_successful_id.map(|e| e.0).context( | |
"impossible: id is initialized in get_latest_ids and never returned to None", | |
)?; | |
let expected_next_id = last_successful_id + (successfuls.len() as i64) + in_flight + 1; | |
// compare to next id based on incrementing | |
if expected_next_id != next_id_to_send.0 { | |
let expected_next_id = self.state.last_successful_id.map(|e| e.0 + last_successful_id + (successfuls.len() as i64) + in_flight + 1); | |
// compare to next id based on incrementing | |
if expected_next_id != Some(next_id_to_send.0) { |
if let Some(last) = self.state.last_successful_id { | ||
Ok((last, latest_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if let Some(last) = self.state.last_successful_id { | |
Ok((last, latest_id)) | |
let last = if let Some(last) = self.state.last_successful_id { | |
last |
// instance | ||
|
||
// skip all past activities: | ||
// instance skip all past activities: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// instance skip all past activities: | |
// instance | |
// skip all past activities: |
Ok((latest_id, latest_id)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok((latest_id, latest_id)) | |
} | |
latest_id | |
} | |
Ok((last, latest_id)) |
Currently, with the implementation of the federation queue from #3605 that was enabled in 0.19, the federation is parallel across instances but sequential per receiving instance.
This means that the maximum throughput of activities is limited by the network latency between instances as well as the internal latency for processing a single activity.
There is an extensive discussion here: #4529 (comment) Though the issue itself is only about one sub-problem.
This PR changes the federation sending component to send activities in parallel, with a configurable maximum concurrency of 8. The implementation is more complex than I expected since we need to keep track of the last_successful_id (which needs to be the highest activity id where every single lower activity has been successfully sent) and we need to keep track of failure counts without immediately jumping to hour-long retry delays when 8 concurrent sends fail simultaneously.
The implementation roughly works as follows:
In order for the results of this to still be correct, fixes need to be applied to make all activities commutative (as discussed above).
It should be possible to also make the concurrency only happen when necessary since for most instance-instance connections it is not, which would reduce the ordering issue. This is not implemented here though.
Currently, this PR fails the federation tests. I think this is both due to a bug somewhere as well as due to the ordering problem.