tessera_shard/
lib.rs

1pub mod router;
2pub mod task_handles;
3#[cfg(not(target_family = "wasm"))]
4mod tokio_runtime;
5
6use std::{
7    any::{Any, TypeId, type_name},
8    hash::Hash,
9    marker::PhantomData,
10    sync::{Arc, OnceLock},
11};
12
13use dashmap::{DashMap, mapref::entry::Entry};
14use parking_lot::RwLock;
15
16/// Describes the lifecycle of this shard state.
17#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
18pub enum ShardStateLifeCycle {
19    /// State exists for the lifetime of one router controller instance.
20    Scope,
21    /// State exists for the lifetime of a route instance.
22    Shard,
23}
24
25pub(crate) type ErasedShardState = dyn Any + Send + Sync;
26pub(crate) type ErasedShardStateHandle = Arc<ErasedShardState>;
27
28#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
29pub(crate) struct ShardStateSlot {
30    slot: u32,
31    generation: u64,
32}
33
34struct ShardSlotEntry {
35    generation: u64,
36    type_id: TypeId,
37    value: Option<ErasedShardStateHandle>,
38}
39
40#[derive(Default)]
41struct ShardSlotTable {
42    entries: Vec<ShardSlotEntry>,
43    free_list: Vec<u32>,
44}
45
46static SHARD_SLOT_TABLE: OnceLock<RwLock<ShardSlotTable>> = OnceLock::new();
47
48fn shard_slot_table() -> &'static RwLock<ShardSlotTable> {
49    SHARD_SLOT_TABLE.get_or_init(|| RwLock::new(ShardSlotTable::default()))
50}
51
52/// Typed shard state handle.
53///
54/// This is a lightweight `Copy` handle backed by shard-managed storage.
55/// The actual value is hosted in a global slot table and validated via
56/// generation to prevent ABA stale access.
57pub struct ShardState<T> {
58    slot: u32,
59    generation: u64,
60    _marker: PhantomData<T>,
61}
62
63impl<T> Copy for ShardState<T> {}
64
65impl<T> Clone for ShardState<T> {
66    fn clone(&self) -> Self {
67        *self
68    }
69}
70
71impl<T> PartialEq for ShardState<T> {
72    fn eq(&self, other: &Self) -> bool {
73        self.slot == other.slot && self.generation == other.generation
74    }
75}
76
77impl<T> Eq for ShardState<T> {}
78
79impl<T> std::hash::Hash for ShardState<T> {
80    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
81        self.slot.hash(state);
82        self.generation.hash(state);
83    }
84}
85
86impl<T> ShardState<T> {
87    fn from_slot(slot: ShardStateSlot) -> Self {
88        Self {
89            slot: slot.slot,
90            generation: slot.generation,
91            _marker: PhantomData,
92        }
93    }
94}
95
96impl<T> ShardState<T>
97where
98    T: Send + Sync + 'static,
99{
100    fn load_entry(&self) -> ErasedShardStateHandle {
101        let table = shard_slot_table().read();
102        let entry = table
103            .entries
104            .get(self.slot as usize)
105            .unwrap_or_else(|| panic!("ShardState points to freed slot: {}", self.slot));
106
107        if entry.generation != self.generation {
108            panic!(
109                "ShardState is stale (slot {}, generation {}, current generation {})",
110                self.slot, self.generation, entry.generation
111            );
112        }
113
114        if entry.type_id != TypeId::of::<T>() {
115            panic!(
116                "ShardState type mismatch for slot {}: expected {}, stored {:?}",
117                self.slot,
118                type_name::<T>(),
119                entry.type_id
120            );
121        }
122
123        entry
124            .value
125            .as_ref()
126            .unwrap_or_else(|| panic!("ShardState slot {} has been recycled", self.slot))
127            .clone()
128    }
129
130    fn load_lock(&self) -> Arc<RwLock<T>> {
131        self.load_entry()
132            .downcast::<RwLock<T>>()
133            .unwrap_or_else(|_| panic!("ShardState slot {} downcast failed", self.slot))
134    }
135
136    /// Execute a closure with a shared reference to the value.
137    pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
138        let lock = self.load_lock();
139        let guard = lock.read();
140        f(&guard)
141    }
142
143    /// Execute a closure with a mutable reference to the value.
144    pub fn with_mut<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
145        let lock = self.load_lock();
146        let mut guard = lock.write();
147        f(&mut guard)
148    }
149
150    /// Get a cloned value. Requires `T: Clone`.
151    pub fn get(&self) -> T
152    where
153        T: Clone,
154    {
155        self.with(Clone::clone)
156    }
157
158    /// Replace the stored value.
159    pub fn set(&self, value: T) {
160        self.with_mut(|slot| *slot = value);
161    }
162}
163
164fn alloc_shard_state_slot<T>() -> ShardStateSlot
165where
166    T: Default + Send + Sync + 'static,
167{
168    let mut table = shard_slot_table().write();
169    let type_id = TypeId::of::<T>();
170
171    if let Some(slot) = table.free_list.pop() {
172        let entry = table
173            .entries
174            .get_mut(slot as usize)
175            .expect("shard slot entry should exist");
176        entry.type_id = type_id;
177        entry.value = Some(Arc::new(RwLock::new(T::default())));
178        return ShardStateSlot {
179            slot,
180            generation: entry.generation,
181        };
182    }
183
184    let slot = table.entries.len() as u32;
185    table.entries.push(ShardSlotEntry {
186        generation: 0,
187        type_id,
188        value: Some(Arc::new(RwLock::new(T::default()))),
189    });
190    ShardStateSlot {
191        slot,
192        generation: 0,
193    }
194}
195
196fn assert_slot_type_for<T>(slot: ShardStateSlot, shard_id: &str, storage_label: &str)
197where
198    T: Send + Sync + 'static,
199{
200    let table = shard_slot_table().read();
201    let entry = table.entries.get(slot.slot as usize).unwrap_or_else(|| {
202        panic!(
203            "shard state slot {} for `{}` in {} storage is missing",
204            slot.slot, shard_id, storage_label
205        )
206    });
207
208    if entry.generation != slot.generation {
209        panic!(
210            "shard state for `{}` in {} storage is stale (slot {}, generation {}, current generation {})",
211            shard_id, storage_label, slot.slot, slot.generation, entry.generation
212        );
213    }
214
215    if entry.value.is_none() {
216        panic!(
217            "shard state for `{}` in {} storage has been recycled (slot {})",
218            shard_id, storage_label, slot.slot
219        );
220    }
221
222    if entry.type_id != TypeId::of::<T>() {
223        panic!(
224            "shard state type mismatch for `{}` in {} storage: expected {}",
225            shard_id,
226            storage_label,
227            type_name::<T>()
228        );
229    }
230}
231
232pub(crate) fn recycle_shard_state_slot(slot: ShardStateSlot) {
233    let mut table = shard_slot_table().write();
234    let Some(entry) = table.entries.get_mut(slot.slot as usize) else {
235        return;
236    };
237    if entry.generation != slot.generation {
238        return;
239    }
240    entry.value = None;
241    entry.generation = entry.generation.wrapping_add(1);
242    table.free_list.push(slot.slot);
243}
244
245pub(crate) type ShardStateMap<K> = DashMap<K, ShardStateSlot>;
246
247pub(crate) fn init_or_get_shard_state_in_map<K, T, F, R>(
248    map: &ShardStateMap<K>,
249    key: K,
250    shard_id: &str,
251    storage_label: &str,
252    f: F,
253) -> R
254where
255    K: Eq + Hash,
256    T: Default + Send + Sync + 'static,
257    F: FnOnce(ShardState<T>) -> R,
258{
259    let slot = match map.entry(key) {
260        Entry::Occupied(entry) => *entry.get(),
261        Entry::Vacant(entry) => {
262            let value = alloc_shard_state_slot::<T>();
263            entry.insert(value);
264            value
265        }
266    };
267
268    assert_slot_type_for::<T>(slot, shard_id, storage_label);
269    f(ShardState::from_slot(slot))
270}