rusteron_media_driver/
lib.rs

1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use log::info;
26use std::path::Path;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29use std::thread::{sleep, JoinHandle};
30use std::time::Duration;
31
32include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
33include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
34
35unsafe impl Sync for AeronDriverContext {}
36unsafe impl Send for AeronDriverContext {}
37unsafe impl Sync for AeronDriver {}
38unsafe impl Send for AeronDriver {}
39
40impl AeronDriver {
41    pub fn launch_embedded(
42        aeron_context: AeronDriverContext,
43        register_sigint: bool,
44    ) -> (Arc<AtomicBool>, JoinHandle<Result<(), AeronCError>>) {
45        AeronDriver::wait_for_previous_media_driver_to_timeout(&aeron_context);
46
47        let stop = Arc::new(AtomicBool::new(false));
48        let stop_copy = stop.clone();
49        // Register signal handler for SIGINT (Ctrl+C)
50        if register_sigint {
51            let stop_copy2 = stop.clone();
52            ctrlc::set_handler(move || {
53                stop_copy2.store(true, Ordering::SeqCst);
54            })
55            .expect("Error setting Ctrl-C handler");
56        }
57
58        let started = Arc::new(AtomicBool::new(false));
59        let started2 = started.clone();
60
61        let dir = aeron_context.get_dir().to_string();
62        info!("Starting media driver [dir={}]", dir);
63        let handle = std::thread::spawn(move || {
64            let aeron_context = aeron_context.clone();
65            let aeron_driver = AeronDriver::new(&aeron_context)?;
66            aeron_driver.start(true)?;
67
68            info!(
69                "Aeron driver started [dir={}]",
70                aeron_driver.context().get_dir()
71            );
72
73            started2.store(true, Ordering::SeqCst);
74
75            // Poll for work until Ctrl+C is pressed
76            while !stop.load(Ordering::Acquire) {
77                aeron_driver.main_idle_strategy(aeron_driver.main_do_work()?);
78            }
79
80            info!("stopping media driver");
81
82            Ok::<_, AeronCError>(())
83        });
84
85        while !started.load(Ordering::SeqCst) && !handle.is_finished() {
86            sleep(Duration::from_millis(100));
87        }
88
89        if handle.is_finished() {
90            panic!("failed to start media driver {:?}", handle.join())
91        }
92        info!("started media driver [dir={}]", dir);
93
94        (stop_copy, handle)
95    }
96
97    /// if you have existing shm files and its before the driver timeout it will try to reuse it and fail
98    /// this makes sure that if that is the case it will wait else it proceeds
99    pub fn wait_for_previous_media_driver_to_timeout(aeron_context: &AeronDriverContext) {
100        if !aeron_context.get_dir_delete_on_start() {
101            let cnc_file = Path::new(aeron_context.get_dir()).join("cnc.dat");
102
103            if cnc_file.exists() {
104                let timeout = Duration::from_millis(aeron_context.get_driver_timeout_ms() * 2)
105                    .as_nanos() as i64;
106
107                let mut duration = timeout;
108
109                if let Ok(md) = cnc_file.metadata() {
110                    if let Ok(modified_time) = md.modified() {
111                        if let Ok(took) = modified_time.elapsed() {
112                            duration = took.as_nanos() as i64;
113                        }
114                    }
115                }
116
117                let delay = timeout - duration;
118
119                if delay > 0 {
120                    let sleep_duration = Duration::from_nanos((delay + 1_000_000) as u64);
121                    info!("cnc file already exists, will need to wait {sleep_duration:?} for timeout [file={cnc_file:?}]");
122                    sleep(sleep_duration);
123                }
124            }
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use log::error;
133    use std::os::raw::c_int;
134    use std::sync::atomic::Ordering;
135    use std::time::Duration;
136
137    #[test]
138    fn version_check() {
139        let major = unsafe { crate::aeron_version_major() };
140        let minor = unsafe { crate::aeron_version_minor() };
141        let patch = unsafe { crate::aeron_version_patch() };
142
143        let aeron_version = format!("{}.{}.{}", major, minor, patch);
144        let cargo_version = "1.50.2";
145        assert_eq!(aeron_version, cargo_version);
146    }
147
148    #[test]
149    fn send_message() -> Result<(), AeronCError> {
150        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
151        let topic = AERON_IPC_STREAM;
152        let stream_id = 32;
153
154        let aeron_context = AeronDriverContext::new()?;
155        aeron_context.set_dir_delete_on_shutdown(true)?;
156        aeron_context.set_dir_delete_on_start(true)?;
157
158        let (stop, _driver_handle) = AeronDriver::launch_embedded(aeron_context.clone(), false);
159
160        // aeron_driver
161        //     .conductor()
162        //     .context()
163        //     .print_configuration();
164        // aeron_driver.main_do_work()?;
165        info!("aeron dir: {:?}", aeron_context.get_dir());
166
167        let dir = aeron_context.get_dir().to_string();
168        let ctx = AeronContext::new()?;
169        ctx.set_dir(&dir.into_c_string())?;
170
171        let client = Aeron::new(&ctx)?;
172
173        #[derive(Default, Debug)]
174        struct ErrorCount {
175            error_count: usize,
176        }
177
178        impl AeronErrorHandlerCallback for ErrorCount {
179            fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
180                error!("Aeron error {}: {}", error_code, msg);
181                self.error_count += 1;
182            }
183        }
184
185        let mut error_handler = Handler::leak(ErrorCount::default());
186        ctx.set_error_handler(Some(&error_handler))?;
187
188        struct Test {}
189        impl AeronAvailableCounterCallback for Test {
190            fn handle_aeron_on_available_counter(
191                &mut self,
192                counters_reader: AeronCountersReader,
193                registration_id: i64,
194                counter_id: i32,
195            ) -> () {
196                info!("new counter counters_reader={counters_reader:?} registration_id={registration_id} counter_id={counter_id}");
197            }
198        }
199
200        impl AeronNewPublicationCallback for Test {
201            fn handle_aeron_on_new_publication(
202                &mut self,
203                async_: AeronAsyncAddPublication,
204                channel: &str,
205                stream_id: i32,
206                session_id: i32,
207                correlation_id: i64,
208            ) -> () {
209                info!("on new publication {async_:?} {channel} {stream_id} {session_id} {correlation_id}")
210            }
211        }
212        let mut handler = Handler::leak(Test {});
213        ctx.set_on_available_counter(Some(&handler))?;
214        ctx.set_on_new_publication(Some(&handler))?;
215
216        client.start()?;
217        info!("aeron driver started");
218        assert!(Aeron::epoch_clock() > 0);
219        assert!(Aeron::nano_clock() > 0);
220
221        let counter_async =
222            AeronAsyncAddCounter::new(&client, 2543543, "12312312".as_bytes(), "abcd")?;
223
224        let counter = counter_async.poll_blocking(Duration::from_secs(15))?;
225        unsafe {
226            *counter.addr() += 1;
227        }
228
229        let result = AeronAsyncAddPublication::new(&client, topic, stream_id)?;
230
231        let publication = result.poll_blocking(std::time::Duration::from_secs(15))?;
232
233        info!("publication channel: {:?}", publication.channel());
234        info!("publication stream_id: {:?}", publication.stream_id());
235        info!("publication status: {:?}", publication.channel_status());
236
237        drop(publication);
238        drop(counter);
239        drop(client);
240        stop.store(true, Ordering::SeqCst);
241        error_handler.release();
242        handler.release();
243
244        Ok(())
245    }
246
247    #[test]
248    pub fn test_debug() -> Result<(), Box<dyn std::error::Error>> {
249        let ctx = AeronDriverContext::new()?;
250
251        println!("{:#?}", ctx);
252
253        struct AgentStartHandler {
254            ctx: AeronDriverContext,
255        }
256
257        impl AeronAgentStartFuncCallback for AgentStartHandler {
258            fn handle_aeron_agent_on_start_func(&mut self, role: &str) -> () {
259                unsafe {
260                    aeron_set_thread_affinity_on_start(
261                        self.ctx.get_inner() as *mut _,
262                        std::ffi::CString::new(role).unwrap().into_raw(),
263                    );
264                }
265            }
266        }
267
268        let mut agent_handler = Handler::leak(AgentStartHandler { ctx: ctx.clone() });
269        ctx.set_agent_on_start_function(Some(&agent_handler))?;
270
271        println!("{:#?}", ctx);
272
273        agent_handler.release();
274        Ok(())
275    }
276}