rusteron_code_gen/
common.rs

1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt::Formatter;
6use std::mem::MaybeUninit;
7use std::ops::{Deref, DerefMut};
8
9pub enum CResource<T> {
10    OwnedOnHeap(std::rc::Rc<ManagedCResource<T>>),
11    /// stored on stack, unsafe, use with care
12    OwnedOnStack(std::mem::MaybeUninit<T>),
13    Borrowed(*mut T),
14}
15
16impl<T: Clone> Clone for CResource<T> {
17    fn clone(&self) -> Self {
18        unsafe {
19            match self {
20                CResource::OwnedOnHeap(r) => CResource::OwnedOnHeap(r.clone()),
21                CResource::OwnedOnStack(r) => {
22                    CResource::OwnedOnStack(MaybeUninit::new(r.assume_init_ref().clone()))
23                }
24                CResource::Borrowed(r) => CResource::Borrowed(r.clone()),
25            }
26        }
27    }
28}
29
30impl<T> CResource<T> {
31    #[inline]
32    pub fn get(&self) -> *mut T {
33        match self {
34            CResource::OwnedOnHeap(r) => r.get(),
35            CResource::OwnedOnStack(r) => r.as_ptr() as *mut T,
36            CResource::Borrowed(r) => *r,
37        }
38    }
39
40    #[inline]
41    // to prevent the dependencies from being dropped as you have a copy here
42    pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
43        match self {
44            CResource::OwnedOnHeap(r) => r.add_dependency(dep),
45            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => {
46                unreachable!("only owned on heap")
47            }
48        }
49    }
50    #[inline]
51    pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
52        match self {
53            CResource::OwnedOnHeap(r) => r.get_dependency(),
54            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
55        }
56    }
57
58    #[inline]
59    pub fn as_owned(&self) -> Option<&std::rc::Rc<ManagedCResource<T>>> {
60        match self {
61            CResource::OwnedOnHeap(r) => Some(r),
62            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
63        }
64    }
65}
66
67impl<T> std::fmt::Debug for CResource<T> {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        let name = std::any::type_name::<T>();
70
71        match self {
72            CResource::OwnedOnHeap(r) => {
73                write!(f, "{name} heap({:?})", r)
74            }
75            CResource::OwnedOnStack(r) => {
76                write!(f, "{name} stack({:?})", *r)
77            }
78            CResource::Borrowed(r) => {
79                write!(f, "{name} borrowed ({:?})", r)
80            }
81        }
82    }
83}
84
85/// A custom struct for managing C resources with automatic cleanup.
86///
87/// It handles initialisation and clean-up of the resource and ensures that resources
88/// are properly released when they go out of scope.
89#[allow(dead_code)]
90pub struct ManagedCResource<T> {
91    resource: *mut T,
92    cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
93    cleanup_struct: bool,
94    /// if someone externally rusteron calls close
95    close_already_called: std::cell::Cell<bool>,
96    /// if there is a c method to verify it someone has closed it, only few structs have this functionality
97    check_for_is_closed: Option<fn(*mut T) -> bool>,
98    /// this will be called if closed hasn't already happened even if its borrowed
99    auto_close: std::cell::Cell<bool>,
100    /// indicates if the underlying resource has already been handed off and should not be re-polled
101    resource_released: std::cell::Cell<bool>,
102    /// to prevent the dependencies from being dropped as you have a copy here,
103    /// for example, you want to have a dependency to aeron for any async jobs so aeron doesnt get dropped first
104    /// when you have a publication/subscription
105    /// Note empty vec does not allocate on heap
106    dependencies: UnsafeCell<Vec<std::rc::Rc<dyn std::any::Any>>>,
107}
108
109impl<T> std::fmt::Debug for ManagedCResource<T> {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        let mut debug_struct = f.debug_struct("ManagedCResource");
112
113        if !self.close_already_called.get()
114            && !self.resource.is_null()
115            && !self
116                .check_for_is_closed
117                .as_ref()
118                .map_or(false, |f| f(self.resource))
119        {
120            debug_struct.field("resource", &self.resource);
121        }
122
123        debug_struct
124            .field("type", &std::any::type_name::<T>())
125            .finish()
126    }
127}
128
129impl<T> ManagedCResource<T> {
130    /// Creates a new ManagedCResource with a given initializer and cleanup function.
131    ///
132    /// The initializer is a closure that attempts to initialize the resource.
133    /// If initialization fails, the initializer should return an error code.
134    /// The cleanup function is used to release the resource when it's no longer needed.
135    /// `cleanup_struct` where it should clean up the struct in rust
136    pub fn new(
137        init: impl FnOnce(*mut *mut T) -> i32,
138        cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
139        cleanup_struct: bool,
140        check_for_is_closed: Option<fn(*mut T) -> bool>,
141    ) -> Result<Self, AeronCError> {
142        let resource = Self::initialise(init)?;
143
144        let result = Self {
145            resource,
146            cleanup,
147            cleanup_struct,
148            close_already_called: std::cell::Cell::new(false),
149            check_for_is_closed,
150            auto_close: std::cell::Cell::new(false),
151            resource_released: std::cell::Cell::new(false),
152            dependencies: UnsafeCell::new(vec![]),
153        };
154        #[cfg(feature = "extra-logging")]
155        log::info!("created c resource: {:?}", result);
156        Ok(result)
157    }
158
159    pub fn initialise(
160        init: impl FnOnce(*mut *mut T) -> i32 + Sized,
161    ) -> Result<*mut T, AeronCError> {
162        let mut resource: *mut T = std::ptr::null_mut();
163        let result = init(&mut resource);
164        if result < 0 || resource.is_null() {
165            return Err(AeronCError::from_code(result));
166        }
167        Ok(resource)
168    }
169
170    pub fn is_closed_already_called(&self) -> bool {
171        self.close_already_called.get()
172            || self.resource.is_null()
173            || self
174                .check_for_is_closed
175                .as_ref()
176                .map_or(false, |f| f(self.resource))
177    }
178
179    /// Gets a raw pointer to the resource.
180    #[inline(always)]
181    pub fn get(&self) -> *mut T {
182        self.resource
183    }
184
185    #[inline(always)]
186    pub fn get_mut(&self) -> &mut T {
187        unsafe { &mut *self.resource }
188    }
189
190    #[inline]
191    // to prevent the dependencies from being dropped as you have a copy here
192    pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
193        if let Some(dep) =
194            (&dep as &dyn std::any::Any).downcast_ref::<std::rc::Rc<dyn std::any::Any>>()
195        {
196            unsafe {
197                (*self.dependencies.get()).push(dep.clone());
198            }
199        } else {
200            unsafe {
201                (*self.dependencies.get()).push(std::rc::Rc::new(dep));
202            }
203        }
204    }
205
206    #[inline]
207    pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
208        unsafe {
209            (*self.dependencies.get())
210                .iter()
211                .filter_map(|x| x.as_ref().downcast_ref::<V>().cloned())
212                .next()
213        }
214    }
215
216    #[inline]
217    pub fn is_resource_released(&self) -> bool {
218        self.resource_released.get()
219    }
220
221    #[inline]
222    pub fn mark_resource_released(&self) {
223        self.resource_released.set(true);
224    }
225
226    /// Closes the resource by calling the cleanup function.
227    ///
228    /// If cleanup fails, it returns an `AeronError`.
229    pub fn close(&mut self) -> Result<(), AeronCError> {
230        if self.close_already_called.get() {
231            return Ok(());
232        }
233        self.close_already_called.set(true);
234
235        let already_closed = self
236            .check_for_is_closed
237            .as_ref()
238            .map_or(false, |f| f(self.resource));
239
240        if let Some(mut cleanup) = self.cleanup.take() {
241            if !self.resource.is_null() {
242                if !already_closed {
243                    let result = cleanup(&mut self.resource);
244                    if result < 0 {
245                        return Err(AeronCError::from_code(result));
246                    }
247                }
248                self.resource = std::ptr::null_mut();
249            }
250        }
251
252        Ok(())
253    }
254}
255
256impl<T> Drop for ManagedCResource<T> {
257    fn drop(&mut self) {
258        if !self.resource.is_null() {
259            let already_closed = self.close_already_called.get()
260                || self
261                    .check_for_is_closed
262                    .as_ref()
263                    .map_or(false, |f| f(self.resource));
264
265            let resource = if already_closed {
266                self.resource
267            } else {
268                self.resource.clone()
269            };
270
271            if !already_closed {
272                // Ensure the clean-up function is called when the resource is dropped.
273                #[cfg(feature = "extra-logging")]
274                log::info!("closing c resource: {:?}", self);
275                let _ = self.close(); // Ignore errors during an automatic drop to avoid panics.
276            }
277            self.close_already_called.set(true);
278
279            if self.cleanup_struct {
280                #[cfg(feature = "extra-logging")]
281                log::info!("closing rust struct resource: {:?}", resource);
282                unsafe {
283                    let _ = Box::from_raw(resource);
284                }
285            }
286        }
287    }
288}
289
290#[derive(Debug, PartialOrd, Eq, PartialEq, Clone)]
291pub enum AeronErrorType {
292    GenericError,
293    ClientErrorDriverTimeout,
294    ClientErrorClientTimeout,
295    ClientErrorConductorServiceTimeout,
296    ClientErrorBufferFull,
297    PublicationBackPressured,
298    PublicationAdminAction,
299    PublicationClosed,
300    PublicationMaxPositionExceeded,
301    PublicationError,
302    TimedOut,
303    Unknown(i32),
304}
305
306impl From<AeronErrorType> for AeronCError {
307    fn from(value: AeronErrorType) -> Self {
308        AeronCError::from_code(value.code())
309    }
310}
311
312impl AeronErrorType {
313    pub fn code(&self) -> i32 {
314        match self {
315            AeronErrorType::GenericError => -1,
316            AeronErrorType::ClientErrorDriverTimeout => -1000,
317            AeronErrorType::ClientErrorClientTimeout => -1001,
318            AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
319            AeronErrorType::ClientErrorBufferFull => -1003,
320            AeronErrorType::PublicationBackPressured => -2,
321            AeronErrorType::PublicationAdminAction => -3,
322            AeronErrorType::PublicationClosed => -4,
323            AeronErrorType::PublicationMaxPositionExceeded => -5,
324            AeronErrorType::PublicationError => -6,
325            AeronErrorType::TimedOut => -234324,
326            AeronErrorType::Unknown(code) => *code,
327        }
328    }
329
330    pub fn is_back_pressured(&self) -> bool {
331        self == &AeronErrorType::PublicationBackPressured
332    }
333
334    pub fn is_admin_action(&self) -> bool {
335        self == &AeronErrorType::PublicationAdminAction
336    }
337
338    pub fn is_back_pressured_or_admin_action(&self) -> bool {
339        self.is_back_pressured() || self.is_admin_action()
340    }
341
342    pub fn from_code(code: i32) -> Self {
343        match code {
344            -1 => AeronErrorType::GenericError,
345            -1000 => AeronErrorType::ClientErrorDriverTimeout,
346            -1001 => AeronErrorType::ClientErrorClientTimeout,
347            -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
348            -1003 => AeronErrorType::ClientErrorBufferFull,
349            -2 => AeronErrorType::PublicationBackPressured,
350            -3 => AeronErrorType::PublicationAdminAction,
351            -4 => AeronErrorType::PublicationClosed,
352            -5 => AeronErrorType::PublicationMaxPositionExceeded,
353            -6 => AeronErrorType::PublicationError,
354            -234324 => AeronErrorType::TimedOut,
355            _ => Unknown(code),
356        }
357    }
358
359    pub fn to_string(&self) -> &'static str {
360        match self {
361            AeronErrorType::GenericError => "Generic Error",
362            AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
363            AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
364            AeronErrorType::ClientErrorConductorServiceTimeout => {
365                "Client Error Conductor Service Timeout"
366            }
367            AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
368            AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
369            AeronErrorType::PublicationAdminAction => "Publication Admin Action",
370            AeronErrorType::PublicationClosed => "Publication Closed",
371            AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
372            AeronErrorType::PublicationError => "Publication Error",
373            AeronErrorType::TimedOut => "Timed Out",
374            AeronErrorType::Unknown(_) => "Unknown Error",
375        }
376    }
377}
378
379/// Represents an Aeron-specific error with a code and an optional message.
380///
381/// The error code is derived from Aeron C API calls.
382/// Use `get_last_err_message()` to retrieve the last human-readable message, if available.
383#[derive(Eq, PartialEq, Clone)]
384pub struct AeronCError {
385    pub code: i32,
386}
387
388impl AeronCError {
389    /// Creates an AeronError from the error code returned by Aeron.
390    ///
391    /// Error codes below zero are considered failure.
392    pub fn from_code(code: i32) -> Self {
393        #[cfg(feature = "backtrace")]
394        {
395            if code < 0 {
396                let backtrace = Backtrace::capture();
397                let backtrace = format!("{:?}", backtrace);
398
399                let re =
400                    regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
401                let mut lines = String::new();
402                re.captures_iter(&backtrace).for_each(|cap| {
403                    let function = &cap[1];
404                    let mut file = cap[2].to_string();
405                    let line = &cap[3];
406                    if file.starts_with("./") {
407                        file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
408                    } else if file.starts_with("/rustc/") {
409                        file = file.split("/").last().unwrap().to_string();
410                    }
411                    // log in intellij friendly error format so can hyperlink to source code in stack trace
412                    lines.push_str(&format!(" {file}:{line} in {function}\n"));
413                });
414
415                log::error!(
416                    "Aeron C error code: {}, kind: '{:?}'\n{}",
417                    code,
418                    AeronErrorType::from_code(code),
419                    lines
420                );
421            }
422        }
423        AeronCError { code }
424    }
425
426    pub fn kind(&self) -> AeronErrorType {
427        AeronErrorType::from_code(self.code)
428    }
429
430    pub fn is_back_pressured(&self) -> bool {
431        self.kind().is_back_pressured()
432    }
433
434    pub fn is_admin_action(&self) -> bool {
435        self.kind().is_admin_action()
436    }
437
438    pub fn is_back_pressured_or_admin_action(&self) -> bool {
439        self.kind().is_back_pressured_or_admin_action()
440    }
441}
442
443/// # Handler
444///
445/// `Handler` is a struct that wraps a raw pointer and a drop flag.
446///
447/// **Important:** `Handler` *MAY* not get dropped automatically. It depends if aeron takes ownership.
448/// For example for global level handlers e.g. error handler aeron will release this handle when closing.
449///
450/// You need to call the `release` method if you want to clear the memory manually.
451/// Its important that you test this out as aeron may do it when closing aeron client.
452///
453/// ## Example
454///
455/// ```no_compile
456/// use rusteron_code_gen::Handler;
457/// let handler = Handler::leak(your_value);
458/// // When you are done with the handler
459/// handler.release();
460/// ```
461pub struct Handler<T> {
462    raw_ptr: *mut T,
463    should_drop: bool,
464}
465
466unsafe impl<T> Send for Handler<T> {}
467unsafe impl<T> Sync for Handler<T> {}
468
469/// Utility method for setting empty handlers
470pub struct Handlers;
471
472impl<T> Handler<T> {
473    pub fn leak(handler: T) -> Self {
474        let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
475        #[cfg(feature = "extra-logging")]
476        log::info!("creating handler {:?}", raw_ptr);
477        Self {
478            raw_ptr,
479            should_drop: true,
480        }
481    }
482
483    pub fn is_none(&self) -> bool {
484        self.raw_ptr.is_null()
485    }
486
487    pub fn as_raw(&self) -> *mut std::os::raw::c_void {
488        self.raw_ptr as *mut std::os::raw::c_void
489    }
490
491    pub fn release(&mut self) {
492        if self.should_drop && !self.raw_ptr.is_null() {
493            unsafe {
494                #[cfg(feature = "extra-logging")]
495                log::info!("dropping handler {:?}", self.raw_ptr);
496                let _ = Box::from_raw(self.raw_ptr as *mut T);
497                self.should_drop = false;
498            }
499        }
500    }
501
502    pub unsafe fn new(raw_ptr: *mut T, should_drop: bool) -> Self {
503        Self {
504            raw_ptr,
505            should_drop,
506        }
507    }
508}
509
510impl<T> Deref for Handler<T> {
511    type Target = T;
512
513    fn deref(&self) -> &Self::Target {
514        unsafe { &*self.raw_ptr as &T }
515    }
516}
517
518impl<T> DerefMut for Handler<T> {
519    fn deref_mut(&mut self) -> &mut Self::Target {
520        unsafe { &mut *self.raw_ptr as &mut T }
521    }
522}
523
524pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
525    let end_port = u16::MAX;
526
527    for port in start_port..=end_port {
528        if is_udp_port_available(port) {
529            return Some(port);
530        }
531    }
532
533    None
534}
535
536pub fn is_udp_port_available(port: u16) -> bool {
537    std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
538}
539
540/// Represents the Aeron URI parser and handler.
541pub struct ChannelUri {}
542
543impl ChannelUri {
544    pub const AERON_SCHEME: &'static str = "aeron";
545    pub const SPY_QUALIFIER: &'static str = "aeron-spy";
546    pub const MAX_URI_LENGTH: usize = 4095;
547}
548
549pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
550pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
551pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
552pub const AERON_UDP_MEDIA: &str = "aeron:udp";
553pub const SPY_PREFIX: &str = "aeron-spy:";
554pub const TAG_PREFIX: &str = "tag:";
555
556/// Enum for media types.
557#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
558pub enum Media {
559    Ipc,
560    Udp,
561}
562
563impl Media {
564    pub fn as_str(&self) -> &'static str {
565        match self {
566            Media::Ipc => "ipc",
567            Media::Udp => "udp",
568        }
569    }
570}
571
572/// Enum for control modes.
573#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
574pub enum ControlMode {
575    Manual,
576    Dynamic,
577    /// this is a beta feature useful when dealing with docker containers and networking
578    Response,
579}
580
581impl ControlMode {
582    pub fn as_str(&self) -> &'static str {
583        match self {
584            ControlMode::Manual => "manual",
585            ControlMode::Dynamic => "dynamic",
586            ControlMode::Response => "response",
587        }
588    }
589}
590
591#[cfg(test)]
592#[allow(dead_code)]
593pub(crate) mod test_alloc {
594    use std::alloc::{GlobalAlloc, Layout, System};
595    use std::sync::atomic::{AtomicIsize, Ordering};
596
597    /// A simple global allocator that tracks the net allocation count.
598    /// For very simple examples can do allocation count before and after your test.
599    /// This does not work well with logger, running media driver, etc. Only for the most
600    /// basic controlled examples
601    pub struct CountingAllocator {
602        allocs: AtomicIsize,
603    }
604
605    impl CountingAllocator {
606        pub const fn new() -> Self {
607            Self {
608                allocs: AtomicIsize::new(0),
609            }
610        }
611        /// Returns the current allocation counter value.
612        fn current(&self) -> isize {
613            self.allocs.load(Ordering::SeqCst)
614        }
615    }
616
617    unsafe impl GlobalAlloc for CountingAllocator {
618        unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
619            self.allocs.fetch_add(1, Ordering::SeqCst);
620            System.alloc(layout)
621        }
622        unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
623            self.allocs.fetch_sub(1, Ordering::SeqCst);
624            System.dealloc(ptr, layout)
625        }
626    }
627
628    #[global_allocator]
629    static GLOBAL: CountingAllocator = CountingAllocator::new();
630
631    /// Returns the current allocation counter value.
632    pub fn current_allocs() -> isize {
633        GLOBAL.current()
634    }
635}
636
637pub trait IntoCString {
638    fn into_c_string(self) -> std::ffi::CString;
639}
640
641impl IntoCString for std::ffi::CString {
642    fn into_c_string(self) -> std::ffi::CString {
643        self
644    }
645}
646
647impl IntoCString for &str {
648    fn into_c_string(self) -> std::ffi::CString {
649        #[cfg(feature = "extra-logging")]
650        log::info!("created c string on heap: {:?}", self);
651
652        std::ffi::CString::new(self).expect("failed to create CString")
653    }
654}
655
656impl IntoCString for String {
657    fn into_c_string(self) -> std::ffi::CString {
658        #[cfg(feature = "extra-logging")]
659        log::info!("created c string on heap: {:?}", self);
660
661        std::ffi::CString::new(self).expect("failed to create CString")
662    }
663}