-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Sharding allocation strategy based on slice ranges #32418
base: main
Are you sure you want to change the base?
Conversation
* for the database sharding we want to reduce number of db connections when using many Akka nodes * allocate entity sharding by slice ranges, which is also used by the database sharding * thereby the db connections from one Akka node will go to one database, instead of to all
val regionsByMbr = regionsByMember(currentShardAllocations.keySet) | ||
val regions = regionsByMbr.keysIterator.toIndexedSeq.sorted(Member.ageOrdering).map(regionsByMbr(_)) | ||
val rangeSize = NumberOfSlices / regions.size | ||
val i = slice / rangeSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A problem with this first naive approach is that adding/removing nodes will reshuffle many shards. Consistent hashing would be nice, but since it's (dynamic) ranges I don't see how that can be used. I have an idea that it can instead find existing adjacent shards and prefer allocation to same region.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't tracked what the updated algorithm is yet, but feels like this issue could always be there in some form.
The optimal allocation (however it ends up there) will have all the shards for a slice range together. When these need to be reallocated, the optimal is to allocate as many as possible to one node again, but that either leads to reshuffles, or needing to accept unbalanced distributions, or they need to be redistributed over the other nodes creating fragmentation for the slice range.
But maybe in practice the find-neighbours approach naturally ends up balancing the tradeoff over time?
* to avoid too much reshuffling when adding/removing members
@johanandren @pvlugter I have something with decent results for the simulations. Can you take a look before I continue with tuning and real tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good so far.
// This covers the rounding case for the last region, which we just distribute over all regions. | ||
// May also happen if member for that region has been removed, but that should be a rare case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// This covers the rounding case for the last region, which we just distribute over all regions. | |
// May also happen if member for that region has been removed, but that should be a rare case. | |
// This covers the rounding case for the last slice, which we just distribute over all regions. | |
// May also happen if member for that region has been removed, but that should be a rare case. |
val overfill = 2 | ||
val maxShards = (NumberOfSlices / currentShardAllocations.size) + overfill | ||
|
||
// FIXME take a look at ShardSuitabilityOrdering for member status and appVersion preference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems quite important, or I think it will try to stick to allocating to old nodes until quite a number of cluster nodes has rolled, as it prefers neighbours. Maybe I'm missing something with the maxShards protecting against that?
emptyRebalanceResult | ||
} else { | ||
// this is the number of slices per region that we are aiming for | ||
val overfill = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it align with the overfill on allocation? (just the 1 in diff but seems strange)
val overfill = 1 | ||
val targetSize = NumberOfSlices / sortedRegionEntries.size + overfill | ||
val selected = Vector.newBuilder[ShardId] | ||
// FIXME ShardSuitabilityOrdering isn't used, but it seems better to use most shards first, combine them? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing the handling of leaving regions and app version from there because there if we don't want to use it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lol, that sentence was not parseable, just wanted to note that the ShardSuitabilityOrdering looks at member leaving and app version, so if we don't want to use it we should probably do something around those properties as well.
This reverts commit 1d5d05b.
1634bec
to
e67e5c1
Compare
|
||
findRegionWithNeighbor(slice, sortedRegionEntries) match { | ||
case Some(regionWithNeighbor) => | ||
Future.successful(regionWithNeighbor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had an idea that I thought would be good, and implemented in 1d5d05b. Reverted because "try distributions" didn't show an improvement in reduction of connections. The idea was to also look at the already allocated range from min to max slice in that region, and if the slice is outside of the optimal range try a to find a region with lower/upper slice neighbor instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still puzzled by that this didn't work, maybe I made some mistake in the implementation. I might debug it a little more.
// These are not real tests, but can be useful for exploring the algorithm and tuning | ||
"SliceRangeShardAllocationStrategy simulations" must { | ||
|
||
"try distributions" ignore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few examples of the result:
total of 73 connections from 20 nodes to 8 backend ranges, reduction by 87
total of 88 connections from 20 nodes to 16 backend ranges, reduction by 232
total of 84 connections from 50 nodes to 8 backend ranges, reduction by 316
total of 98 connections from 50 nodes to 16 backend ranges, reduction by 702
total of 125 connections from 100 nodes to 8 backend ranges, reduction by 675
total of 141 connections from 100 nodes to 16 backend ranges, reduction by 1459
And connection here should be read as db connection pool, so multiply by a factor of 10 or size of the pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so for the numbers we've been testing with (around 25 nodes and 8 slice ranges) it's a decent improvement, but still over 3x the optimal number of connections. And for higher number of nodes it's easier to get affinity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say a connection pool of max 20, then 73*20/8=274 connections per db should still be fine.
I'd say that it's more important for the larger clusters, so good that it looks better for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One idea that I have been pondering is if we should have a pre-allocation phase where it would allocate all shards in order 0-1023. Then it would be near perfect affinity. Would have to be triggered from the outside of the allocation strategy itself, in the coordinator, or by something asking the coordinator for shard homes. Could be triggered when reaching min-nr-of-members
.
However, it would only help for the initial allocation and not for rebalance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a small change that had a significant improvement for the smaller clusters. I adjust how far to look for neighbors depending on optimal range size, previously it was hardcoded to 10. bc9f86d
New results:
total of 46 connections from 20 nodes to 8 backend ranges, reduction by 114
total of 67 connections from 20 nodes to 16 backend ranges, reduction by 253
total of 84 connections from 50 nodes to 8 backend ranges, reduction by 316
total of 98 connections from 50 nodes to 16 backend ranges, reduction by 702
total of 124 connections from 100 nodes to 8 backend ranges, reduction by 676
total of 138 connections from 100 nodes to 16 backend ranges, reduction by 1462
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Makes sense that it has more opportunity to coalesce with looking further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good, esp for the bigger cluster sizes when running the "try distributions" case.
override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId | ||
|
||
override def shardId(entityId: String): String = { | ||
// FIXME shall we have the Persistence extension dependency here, or re-implement sliceForPersistenceId? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional dependency on persistence seems fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that this is only useful together with database sharding, i.e. persistence included. I was even thinking that it should only be documented in https://doc.akka.io/docs/akka-persistence-r2dbc/current/data-partition.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that makes sense
if (slice >= numberOfSlices) | ||
throw new IllegalArgumentException("slice must be between 0 and 1023. Use `ShardBySliceMessageExtractor`.") | ||
|
||
val sortedRegionEntries = regionEntriesFor(currentShardAllocations).toVector.sorted(shardSuitabilityOrdering) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice, covers leaving and app version without repeating that logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, had to make a parameter for using least shards or not in the ordering
// prefer the node with the least allocated shards | ||
JInteger.compare(allocatedShardsX.size, allocatedShardsY.size) | ||
} else if (x.member.upNumber != y.member.upNumber) { | ||
// prefer older | ||
Member.ageOrdering.compare(x.member, y.member) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why prefer older? Not saying younger or ignoring age would be better, but want to understand the rationale. (I understand this is the case where nodes have same version so not rolling)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong preference, mostly just wanted to have a deterministic order. Possibly that a younger could be more "cold" and shouldn't be overloaded by many new shards immediately.
...ing-typed/src/main/scala/akka/cluster/sharding/typed/SliceRangeShardAllocationStrategy.scala
Outdated
Show resolved
Hide resolved
val currentNumberOfShards = sortedRegionEntries.map(_.shardIds.size).sum | ||
val limitedResult = result.take(limit(currentNumberOfShards)).toSet | ||
previousRebalance = previousRebalance.union(limitedResult) | ||
if (previousRebalance.size >= numberOfSlices / 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did it trigger rebalance-loop with the previous lower value (100 I think)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No exact science around that choice. In some simulations I could see loops with 100, which looked better with 256.
Some api docs and maybe some cleanup remaining, but marking as ready. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good starting point to improve things for number of connections 👍🏼
The tension between distribution and affinity feels fundamental to the problem here, and likely always some tradeoffs to make. Maybe it's useful to have an option to trigger more reshuffling to get closer to optimal in terms of affinity, if cluster membership changes are not expected to happen often and more movement is ok during changes before settling again? Or to retain optimal affinity allocation after a rolling update (compared with cluster size changes).
Some extra thoughts (not for now): wonder if there's potential for this to become a general affinity-based allocation strategy, where some affinity function is provided. And thinking about whether there's a hashing + affinity/preference approach that would be useful here (compared with searching for neighbours / high affinity shards).
// These are not real tests, but can be useful for exploring the algorithm and tuning | ||
"SliceRangeShardAllocationStrategy simulations" must { | ||
|
||
"try distributions" ignore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so for the numbers we've been testing with (around 25 nodes and 8 slice ranges) it's a decent improvement, but still over 3x the optimal number of connections. And for higher number of nodes it's easier to get affinity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Early draft so far.