1pub mod router;
2pub mod task_handles;
3#[cfg(not(target_family = "wasm"))]
4mod tokio_runtime;
5
6use std::{
7 any::{Any, TypeId, type_name},
8 hash::Hash,
9 marker::PhantomData,
10 sync::{Arc, OnceLock},
11};
12
13use dashmap::{DashMap, mapref::entry::Entry};
14use parking_lot::RwLock;
15
16#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
18pub enum ShardStateLifeCycle {
19 Scope,
21 Shard,
23}
24
25pub(crate) type ErasedShardState = dyn Any + Send + Sync;
26pub(crate) type ErasedShardStateHandle = Arc<ErasedShardState>;
27
28#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
29pub(crate) struct ShardStateSlot {
30 slot: u32,
31 generation: u64,
32}
33
34struct ShardSlotEntry {
35 generation: u64,
36 type_id: TypeId,
37 value: Option<ErasedShardStateHandle>,
38}
39
40#[derive(Default)]
41struct ShardSlotTable {
42 entries: Vec<ShardSlotEntry>,
43 free_list: Vec<u32>,
44}
45
46static SHARD_SLOT_TABLE: OnceLock<RwLock<ShardSlotTable>> = OnceLock::new();
47
48fn shard_slot_table() -> &'static RwLock<ShardSlotTable> {
49 SHARD_SLOT_TABLE.get_or_init(|| RwLock::new(ShardSlotTable::default()))
50}
51
52pub struct ShardState<T> {
58 slot: u32,
59 generation: u64,
60 _marker: PhantomData<T>,
61}
62
63impl<T> Copy for ShardState<T> {}
64
65impl<T> Clone for ShardState<T> {
66 fn clone(&self) -> Self {
67 *self
68 }
69}
70
71impl<T> PartialEq for ShardState<T> {
72 fn eq(&self, other: &Self) -> bool {
73 self.slot == other.slot && self.generation == other.generation
74 }
75}
76
77impl<T> Eq for ShardState<T> {}
78
79impl<T> std::hash::Hash for ShardState<T> {
80 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
81 self.slot.hash(state);
82 self.generation.hash(state);
83 }
84}
85
86impl<T> ShardState<T> {
87 fn from_slot(slot: ShardStateSlot) -> Self {
88 Self {
89 slot: slot.slot,
90 generation: slot.generation,
91 _marker: PhantomData,
92 }
93 }
94}
95
96impl<T> ShardState<T>
97where
98 T: Send + Sync + 'static,
99{
100 fn load_entry(&self) -> ErasedShardStateHandle {
101 let table = shard_slot_table().read();
102 let entry = table
103 .entries
104 .get(self.slot as usize)
105 .unwrap_or_else(|| panic!("ShardState points to freed slot: {}", self.slot));
106
107 if entry.generation != self.generation {
108 panic!(
109 "ShardState is stale (slot {}, generation {}, current generation {})",
110 self.slot, self.generation, entry.generation
111 );
112 }
113
114 if entry.type_id != TypeId::of::<T>() {
115 panic!(
116 "ShardState type mismatch for slot {}: expected {}, stored {:?}",
117 self.slot,
118 type_name::<T>(),
119 entry.type_id
120 );
121 }
122
123 entry
124 .value
125 .as_ref()
126 .unwrap_or_else(|| panic!("ShardState slot {} has been recycled", self.slot))
127 .clone()
128 }
129
130 fn load_lock(&self) -> Arc<RwLock<T>> {
131 self.load_entry()
132 .downcast::<RwLock<T>>()
133 .unwrap_or_else(|_| panic!("ShardState slot {} downcast failed", self.slot))
134 }
135
136 pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
138 let lock = self.load_lock();
139 let guard = lock.read();
140 f(&guard)
141 }
142
143 pub fn with_mut<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
145 let lock = self.load_lock();
146 let mut guard = lock.write();
147 f(&mut guard)
148 }
149
150 pub fn get(&self) -> T
152 where
153 T: Clone,
154 {
155 self.with(Clone::clone)
156 }
157
158 pub fn set(&self, value: T) {
160 self.with_mut(|slot| *slot = value);
161 }
162}
163
164fn alloc_shard_state_slot<T>() -> ShardStateSlot
165where
166 T: Default + Send + Sync + 'static,
167{
168 let mut table = shard_slot_table().write();
169 let type_id = TypeId::of::<T>();
170
171 if let Some(slot) = table.free_list.pop() {
172 let entry = table
173 .entries
174 .get_mut(slot as usize)
175 .expect("shard slot entry should exist");
176 entry.type_id = type_id;
177 entry.value = Some(Arc::new(RwLock::new(T::default())));
178 return ShardStateSlot {
179 slot,
180 generation: entry.generation,
181 };
182 }
183
184 let slot = table.entries.len() as u32;
185 table.entries.push(ShardSlotEntry {
186 generation: 0,
187 type_id,
188 value: Some(Arc::new(RwLock::new(T::default()))),
189 });
190 ShardStateSlot {
191 slot,
192 generation: 0,
193 }
194}
195
196fn assert_slot_type_for<T>(slot: ShardStateSlot, shard_id: &str, storage_label: &str)
197where
198 T: Send + Sync + 'static,
199{
200 let table = shard_slot_table().read();
201 let entry = table.entries.get(slot.slot as usize).unwrap_or_else(|| {
202 panic!(
203 "shard state slot {} for `{}` in {} storage is missing",
204 slot.slot, shard_id, storage_label
205 )
206 });
207
208 if entry.generation != slot.generation {
209 panic!(
210 "shard state for `{}` in {} storage is stale (slot {}, generation {}, current generation {})",
211 shard_id, storage_label, slot.slot, slot.generation, entry.generation
212 );
213 }
214
215 if entry.value.is_none() {
216 panic!(
217 "shard state for `{}` in {} storage has been recycled (slot {})",
218 shard_id, storage_label, slot.slot
219 );
220 }
221
222 if entry.type_id != TypeId::of::<T>() {
223 panic!(
224 "shard state type mismatch for `{}` in {} storage: expected {}",
225 shard_id,
226 storage_label,
227 type_name::<T>()
228 );
229 }
230}
231
232pub(crate) fn recycle_shard_state_slot(slot: ShardStateSlot) {
233 let mut table = shard_slot_table().write();
234 let Some(entry) = table.entries.get_mut(slot.slot as usize) else {
235 return;
236 };
237 if entry.generation != slot.generation {
238 return;
239 }
240 entry.value = None;
241 entry.generation = entry.generation.wrapping_add(1);
242 table.free_list.push(slot.slot);
243}
244
245pub(crate) type ShardStateMap<K> = DashMap<K, ShardStateSlot>;
246
247pub(crate) fn init_or_get_shard_state_in_map<K, T, F, R>(
248 map: &ShardStateMap<K>,
249 key: K,
250 shard_id: &str,
251 storage_label: &str,
252 f: F,
253) -> R
254where
255 K: Eq + Hash,
256 T: Default + Send + Sync + 'static,
257 F: FnOnce(ShardState<T>) -> R,
258{
259 let slot = match map.entry(key) {
260 Entry::Occupied(entry) => *entry.get(),
261 Entry::Vacant(entry) => {
262 let value = alloc_shard_state_slot::<T>();
263 entry.insert(value);
264 value
265 }
266 };
267
268 assert_slot_type_for::<T>(slot, shard_id, storage_label);
269 f(ShardState::from_slot(slot))
270}