rusteron_code_gen/
common.rs

1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt::Formatter;
6use std::mem::MaybeUninit;
7use std::ops::{Deref, DerefMut};
8
9pub enum CResource<T> {
10    OwnedOnHeap(std::rc::Rc<ManagedCResource<T>>),
11    /// stored on stack, unsafe, use with care
12    OwnedOnStack(std::mem::MaybeUninit<T>),
13    Borrowed(*mut T),
14}
15
16impl<T: Clone> Clone for CResource<T> {
17    fn clone(&self) -> Self {
18        unsafe {
19            match self {
20                CResource::OwnedOnHeap(r) => CResource::OwnedOnHeap(r.clone()),
21                CResource::OwnedOnStack(r) => {
22                    CResource::OwnedOnStack(MaybeUninit::new(r.assume_init_ref().clone()))
23                }
24                CResource::Borrowed(r) => CResource::Borrowed(r.clone()),
25            }
26        }
27    }
28}
29
30impl<T> CResource<T> {
31    #[inline]
32    pub fn get(&self) -> *mut T {
33        match self {
34            CResource::OwnedOnHeap(r) => r.get(),
35            CResource::OwnedOnStack(r) => r.as_ptr() as *mut T,
36            CResource::Borrowed(r) => *r,
37        }
38    }
39
40    #[inline]
41    // to prevent the dependencies from being dropped as you have a copy here
42    pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
43        match self {
44            CResource::OwnedOnHeap(r) => r.add_dependency(dep),
45            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => {
46                unreachable!("only owned on heap")
47            }
48        }
49    }
50    #[inline]
51    pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
52        match self {
53            CResource::OwnedOnHeap(r) => r.get_dependency(),
54            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
55        }
56    }
57
58    #[inline]
59    pub fn as_owned(&self) -> Option<&std::rc::Rc<ManagedCResource<T>>> {
60        match self {
61            CResource::OwnedOnHeap(r) => Some(r),
62            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
63        }
64    }
65}
66
67impl<T> std::fmt::Debug for CResource<T> {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        let name = std::any::type_name::<T>();
70
71        match self {
72            CResource::OwnedOnHeap(r) => {
73                write!(f, "{name} heap({:?})", r)
74            }
75            CResource::OwnedOnStack(r) => {
76                write!(f, "{name} stack({:?})", *r)
77            }
78            CResource::Borrowed(r) => {
79                write!(f, "{name} borrowed ({:?})", r)
80            }
81        }
82    }
83}
84
85/// A custom struct for managing C resources with automatic cleanup.
86///
87/// It handles initialisation and clean-up of the resource and ensures that resources
88/// are properly released when they go out of scope.
89#[allow(dead_code)]
90pub struct ManagedCResource<T> {
91    resource: *mut T,
92    cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
93    cleanup_struct: bool,
94    /// if someone externally rusteron calls close
95    close_already_called: std::cell::Cell<bool>,
96    /// if there is a c method to verify it someone has closed it, only few structs have this functionality
97    check_for_is_closed: Option<fn(*mut T) -> bool>,
98    /// this will be called if closed hasn't already happened even if its borrowed
99    auto_close: std::cell::Cell<bool>,
100    /// to prevent the dependencies from being dropped as you have a copy here,
101    /// for example, you want to have a dependency to aeron for any async jobs so aeron doesnt get dropped first
102    /// when you have a publication/subscription
103    /// Note empty vec does not allocate on heap
104    dependencies: UnsafeCell<Vec<std::rc::Rc<dyn std::any::Any>>>,
105}
106
107impl<T> std::fmt::Debug for ManagedCResource<T> {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        let mut debug_struct = f.debug_struct("ManagedCResource");
110
111        if !self.close_already_called.get()
112            && !self.resource.is_null()
113            && !self
114                .check_for_is_closed
115                .as_ref()
116                .map_or(false, |f| f(self.resource))
117        {
118            debug_struct.field("resource", &self.resource);
119        }
120
121        debug_struct
122            .field("type", &std::any::type_name::<T>())
123            .finish()
124    }
125}
126
127impl<T> ManagedCResource<T> {
128    /// Creates a new ManagedCResource with a given initializer and cleanup function.
129    ///
130    /// The initializer is a closure that attempts to initialize the resource.
131    /// If initialization fails, the initializer should return an error code.
132    /// The cleanup function is used to release the resource when it's no longer needed.
133    /// `cleanup_struct` where it should clean up the struct in rust
134    pub fn new(
135        init: impl FnOnce(*mut *mut T) -> i32,
136        cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
137        cleanup_struct: bool,
138        check_for_is_closed: Option<fn(*mut T) -> bool>,
139    ) -> Result<Self, AeronCError> {
140        let resource = Self::initialise(init)?;
141
142        let result = Self {
143            resource,
144            cleanup,
145            cleanup_struct,
146            close_already_called: std::cell::Cell::new(false),
147            check_for_is_closed,
148            auto_close: std::cell::Cell::new(false),
149            dependencies: UnsafeCell::new(vec![]),
150        };
151        #[cfg(feature = "extra-logging")]
152        log::info!("created c resource: {:?}", result);
153        Ok(result)
154    }
155
156    pub fn initialise(
157        init: impl FnOnce(*mut *mut T) -> i32 + Sized,
158    ) -> Result<*mut T, AeronCError> {
159        let mut resource: *mut T = std::ptr::null_mut();
160        let result = init(&mut resource);
161        if result < 0 || resource.is_null() {
162            return Err(AeronCError::from_code(result));
163        }
164        Ok(resource)
165    }
166
167    pub fn is_closed_already_called(&self) -> bool {
168        self.close_already_called.get()
169            || self.resource.is_null()
170            || self
171                .check_for_is_closed
172                .as_ref()
173                .map_or(false, |f| f(self.resource))
174    }
175
176    /// Gets a raw pointer to the resource.
177    #[inline(always)]
178    pub fn get(&self) -> *mut T {
179        self.resource
180    }
181
182    #[inline(always)]
183    pub fn get_mut(&self) -> &mut T {
184        unsafe { &mut *self.resource }
185    }
186
187    #[inline]
188    // to prevent the dependencies from being dropped as you have a copy here
189    pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
190        if let Some(dep) =
191            (&dep as &dyn std::any::Any).downcast_ref::<std::rc::Rc<dyn std::any::Any>>()
192        {
193            unsafe {
194                (*self.dependencies.get()).push(dep.clone());
195            }
196        } else {
197            unsafe {
198                (*self.dependencies.get()).push(std::rc::Rc::new(dep));
199            }
200        }
201    }
202
203    #[inline]
204    pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
205        unsafe {
206            (*self.dependencies.get())
207                .iter()
208                .filter_map(|x| x.as_ref().downcast_ref::<V>().cloned())
209                .next()
210        }
211    }
212
213    /// Closes the resource by calling the cleanup function.
214    ///
215    /// If cleanup fails, it returns an `AeronError`.
216    pub fn close(&mut self) -> Result<(), AeronCError> {
217        if self.close_already_called.get() {
218            return Ok(());
219        }
220        self.close_already_called.set(true);
221
222        let already_closed = self
223            .check_for_is_closed
224            .as_ref()
225            .map_or(false, |f| f(self.resource));
226
227        if let Some(mut cleanup) = self.cleanup.take() {
228            if !self.resource.is_null() {
229                if !already_closed {
230                    let result = cleanup(&mut self.resource);
231                    if result < 0 {
232                        return Err(AeronCError::from_code(result));
233                    }
234                }
235                self.resource = std::ptr::null_mut();
236            }
237        }
238
239        Ok(())
240    }
241}
242
243impl<T> Drop for ManagedCResource<T> {
244    fn drop(&mut self) {
245        if !self.resource.is_null() {
246            let already_closed = self.close_already_called.get()
247                || self
248                    .check_for_is_closed
249                    .as_ref()
250                    .map_or(false, |f| f(self.resource));
251
252            let resource = if already_closed {
253                self.resource
254            } else {
255                self.resource.clone()
256            };
257
258            if !already_closed {
259                // Ensure the clean-up function is called when the resource is dropped.
260                #[cfg(feature = "extra-logging")]
261                log::info!("closing c resource: {:?}", self);
262                let _ = self.close(); // Ignore errors during an automatic drop to avoid panics.
263            }
264            self.close_already_called.set(true);
265
266            if self.cleanup_struct {
267                #[cfg(feature = "extra-logging")]
268                log::info!("closing rust struct resource: {:?}", resource);
269                unsafe {
270                    let _ = Box::from_raw(resource);
271                }
272            }
273        }
274    }
275}
276
277#[derive(Debug, PartialOrd, Eq, PartialEq, Clone)]
278pub enum AeronErrorType {
279    GenericError,
280    ClientErrorDriverTimeout,
281    ClientErrorClientTimeout,
282    ClientErrorConductorServiceTimeout,
283    ClientErrorBufferFull,
284    PublicationBackPressured,
285    PublicationAdminAction,
286    PublicationClosed,
287    PublicationMaxPositionExceeded,
288    PublicationError,
289    TimedOut,
290    Unknown(i32),
291}
292
293impl From<AeronErrorType> for AeronCError {
294    fn from(value: AeronErrorType) -> Self {
295        AeronCError::from_code(value.code())
296    }
297}
298
299impl AeronErrorType {
300    pub fn code(&self) -> i32 {
301        match self {
302            AeronErrorType::GenericError => -1,
303            AeronErrorType::ClientErrorDriverTimeout => -1000,
304            AeronErrorType::ClientErrorClientTimeout => -1001,
305            AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
306            AeronErrorType::ClientErrorBufferFull => -1003,
307            AeronErrorType::PublicationBackPressured => -2,
308            AeronErrorType::PublicationAdminAction => -3,
309            AeronErrorType::PublicationClosed => -4,
310            AeronErrorType::PublicationMaxPositionExceeded => -5,
311            AeronErrorType::PublicationError => -6,
312            AeronErrorType::TimedOut => -234324,
313            AeronErrorType::Unknown(code) => *code,
314        }
315    }
316
317    pub fn is_back_pressured(&self) -> bool {
318        self == &AeronErrorType::PublicationBackPressured
319    }
320
321    pub fn is_admin_action(&self) -> bool {
322        self == &AeronErrorType::PublicationAdminAction
323    }
324
325    pub fn is_back_pressured_or_admin_action(&self) -> bool {
326        self.is_back_pressured() || self.is_admin_action()
327    }
328
329    pub fn from_code(code: i32) -> Self {
330        match code {
331            -1 => AeronErrorType::GenericError,
332            -1000 => AeronErrorType::ClientErrorDriverTimeout,
333            -1001 => AeronErrorType::ClientErrorClientTimeout,
334            -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
335            -1003 => AeronErrorType::ClientErrorBufferFull,
336            -2 => AeronErrorType::PublicationBackPressured,
337            -3 => AeronErrorType::PublicationAdminAction,
338            -4 => AeronErrorType::PublicationClosed,
339            -5 => AeronErrorType::PublicationMaxPositionExceeded,
340            -6 => AeronErrorType::PublicationError,
341            -234324 => AeronErrorType::TimedOut,
342            _ => Unknown(code),
343        }
344    }
345
346    pub fn to_string(&self) -> &'static str {
347        match self {
348            AeronErrorType::GenericError => "Generic Error",
349            AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
350            AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
351            AeronErrorType::ClientErrorConductorServiceTimeout => {
352                "Client Error Conductor Service Timeout"
353            }
354            AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
355            AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
356            AeronErrorType::PublicationAdminAction => "Publication Admin Action",
357            AeronErrorType::PublicationClosed => "Publication Closed",
358            AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
359            AeronErrorType::PublicationError => "Publication Error",
360            AeronErrorType::TimedOut => "Timed Out",
361            AeronErrorType::Unknown(_) => "Unknown Error",
362        }
363    }
364}
365
366/// Represents an Aeron-specific error with a code and an optional message.
367///
368/// The error code is derived from Aeron C API calls.
369/// Use `get_last_err_message()` to retrieve the last human-readable message, if available.
370#[derive(Eq, PartialEq, Clone)]
371pub struct AeronCError {
372    pub code: i32,
373}
374
375impl AeronCError {
376    /// Creates an AeronError from the error code returned by Aeron.
377    ///
378    /// Error codes below zero are considered failure.
379    pub fn from_code(code: i32) -> Self {
380        #[cfg(feature = "backtrace")]
381        {
382            if code < 0 {
383                let backtrace = Backtrace::capture();
384                let backtrace = format!("{:?}", backtrace);
385
386                let re =
387                    regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
388                let mut lines = String::new();
389                re.captures_iter(&backtrace).for_each(|cap| {
390                    let function = &cap[1];
391                    let mut file = cap[2].to_string();
392                    let line = &cap[3];
393                    if file.starts_with("./") {
394                        file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
395                    } else if file.starts_with("/rustc/") {
396                        file = file.split("/").last().unwrap().to_string();
397                    }
398                    // log in intellij friendly error format so can hyperlink to source code in stack trace
399                    lines.push_str(&format!(" {file}:{line} in {function}\n"));
400                });
401
402                log::error!(
403                    "Aeron C error code: {}, kind: '{:?}'\n{}",
404                    code,
405                    AeronErrorType::from_code(code),
406                    lines
407                );
408            }
409        }
410        AeronCError { code }
411    }
412
413    pub fn kind(&self) -> AeronErrorType {
414        AeronErrorType::from_code(self.code)
415    }
416
417    pub fn is_back_pressured(&self) -> bool {
418        self.kind().is_back_pressured()
419    }
420
421    pub fn is_admin_action(&self) -> bool {
422        self.kind().is_admin_action()
423    }
424
425    pub fn is_back_pressured_or_admin_action(&self) -> bool {
426        self.kind().is_back_pressured_or_admin_action()
427    }
428}
429
430/// # Handler
431///
432/// `Handler` is a struct that wraps a raw pointer and a drop flag.
433///
434/// **Important:** `Handler` *MAY* not get dropped automatically. It depends if aeron takes ownership.
435/// For example for global level handlers e.g. error handler aeron will release this handle when closing.
436///
437/// You need to call the `release` method if you want to clear the memory manually.
438/// Its important that you test this out as aeron may do it when closing aeron client.
439///
440/// ## Example
441///
442/// ```no_compile
443/// use rusteron_code_gen::Handler;
444/// let handler = Handler::leak(your_value);
445/// // When you are done with the handler
446/// handler.release();
447/// ```
448pub struct Handler<T> {
449    raw_ptr: *mut T,
450    should_drop: bool,
451}
452
453unsafe impl<T> Send for Handler<T> {}
454unsafe impl<T> Sync for Handler<T> {}
455
456/// Utility method for setting empty handlers
457pub struct Handlers;
458
459impl<T> Handler<T> {
460    pub fn leak(handler: T) -> Self {
461        let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
462        #[cfg(feature = "extra-logging")]
463        log::info!("creating handler {:?}", raw_ptr);
464        Self {
465            raw_ptr,
466            should_drop: true,
467        }
468    }
469
470    pub fn is_none(&self) -> bool {
471        self.raw_ptr.is_null()
472    }
473
474    pub fn as_raw(&self) -> *mut std::os::raw::c_void {
475        self.raw_ptr as *mut std::os::raw::c_void
476    }
477
478    pub fn release(&mut self) {
479        if self.should_drop && !self.raw_ptr.is_null() {
480            unsafe {
481                #[cfg(feature = "extra-logging")]
482                log::info!("dropping handler {:?}", self.raw_ptr);
483                let _ = Box::from_raw(self.raw_ptr as *mut T);
484                self.should_drop = false;
485            }
486        }
487    }
488
489    pub unsafe fn new(raw_ptr: *mut T, should_drop: bool) -> Self {
490        Self {
491            raw_ptr,
492            should_drop,
493        }
494    }
495}
496
497impl<T> Deref for Handler<T> {
498    type Target = T;
499
500    fn deref(&self) -> &Self::Target {
501        unsafe { &*self.raw_ptr as &T }
502    }
503}
504
505impl<T> DerefMut for Handler<T> {
506    fn deref_mut(&mut self) -> &mut Self::Target {
507        unsafe { &mut *self.raw_ptr as &mut T }
508    }
509}
510
511pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
512    let end_port = u16::MAX;
513
514    for port in start_port..=end_port {
515        if is_udp_port_available(port) {
516            return Some(port);
517        }
518    }
519
520    None
521}
522
523pub fn is_udp_port_available(port: u16) -> bool {
524    std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
525}
526
527/// Represents the Aeron URI parser and handler.
528pub struct ChannelUri {}
529
530impl ChannelUri {
531    pub const AERON_SCHEME: &'static str = "aeron";
532    pub const SPY_QUALIFIER: &'static str = "aeron-spy";
533    pub const MAX_URI_LENGTH: usize = 4095;
534}
535
536pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
537pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
538pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
539pub const AERON_UDP_MEDIA: &str = "aeron:udp";
540pub const SPY_PREFIX: &str = "aeron-spy:";
541pub const TAG_PREFIX: &str = "tag:";
542
543/// Enum for media types.
544#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
545pub enum Media {
546    Ipc,
547    Udp,
548}
549
550impl Media {
551    pub fn as_str(&self) -> &'static str {
552        match self {
553            Media::Ipc => "ipc",
554            Media::Udp => "udp",
555        }
556    }
557}
558
559/// Enum for control modes.
560#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
561pub enum ControlMode {
562    Manual,
563    Dynamic,
564    /// this is a beta feature useful when dealing with docker containers and networking
565    Response,
566}
567
568impl ControlMode {
569    pub fn as_str(&self) -> &'static str {
570        match self {
571            ControlMode::Manual => "manual",
572            ControlMode::Dynamic => "dynamic",
573            ControlMode::Response => "response",
574        }
575    }
576}
577
578#[cfg(test)]
579#[allow(dead_code)]
580pub(crate) mod test_alloc {
581    use std::alloc::{GlobalAlloc, Layout, System};
582    use std::sync::atomic::{AtomicIsize, Ordering};
583
584    /// A simple global allocator that tracks the net allocation count.
585    /// For very simple examples can do allocation count before and after your test.
586    /// This does not work well with logger, running media driver, etc. Only for the most
587    /// basic controlled examples
588    pub struct CountingAllocator {
589        allocs: AtomicIsize,
590    }
591
592    impl CountingAllocator {
593        pub const fn new() -> Self {
594            Self {
595                allocs: AtomicIsize::new(0),
596            }
597        }
598        /// Returns the current allocation counter value.
599        fn current(&self) -> isize {
600            self.allocs.load(Ordering::SeqCst)
601        }
602    }
603
604    unsafe impl GlobalAlloc for CountingAllocator {
605        unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
606            self.allocs.fetch_add(1, Ordering::SeqCst);
607            System.alloc(layout)
608        }
609        unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
610            self.allocs.fetch_sub(1, Ordering::SeqCst);
611            System.dealloc(ptr, layout)
612        }
613    }
614
615    #[global_allocator]
616    static GLOBAL: CountingAllocator = CountingAllocator::new();
617
618    /// Returns the current allocation counter value.
619    pub fn current_allocs() -> isize {
620        GLOBAL.current()
621    }
622}
623
624pub trait IntoCString {
625    fn into_c_string(self) -> std::ffi::CString;
626}
627
628impl IntoCString for std::ffi::CString {
629    fn into_c_string(self) -> std::ffi::CString {
630        self
631    }
632}
633
634impl IntoCString for &str {
635    fn into_c_string(self) -> std::ffi::CString {
636        #[cfg(feature = "extra-logging")]
637        log::info!("created c string on heap: {:?}", self);
638
639        std::ffi::CString::new(self).expect("failed to create CString")
640    }
641}
642
643impl IntoCString for String {
644    fn into_c_string(self) -> std::ffi::CString {
645        #[cfg(feature = "extra-logging")]
646        log::info!("created c string on heap: {:?}", self);
647
648        std::ffi::CString::new(self).expect("failed to create CString")
649    }
650}