rusteron_media_driver/
lib.rs

1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use log::info;
26use std::path::Path;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29use std::thread::{sleep, JoinHandle};
30use std::time::Duration;
31
32include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
33include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
34
35unsafe impl Sync for AeronDriverContext {}
36unsafe impl Send for AeronDriverContext {}
37unsafe impl Sync for AeronDriver {}
38unsafe impl Send for AeronDriver {}
39
40impl AeronDriver {
41    pub fn launch_embedded(
42        aeron_context: AeronDriverContext,
43        register_sigint: bool,
44    ) -> (Arc<AtomicBool>, JoinHandle<Result<(), AeronCError>>) {
45        AeronDriver::wait_for_previous_media_driver_to_timeout(&aeron_context);
46
47        let stop = Arc::new(AtomicBool::new(false));
48        let stop_copy = stop.clone();
49        // Register signal handler for SIGINT (Ctrl+C)
50        if register_sigint {
51            let stop_copy2 = stop.clone();
52            ctrlc::set_handler(move || {
53                stop_copy2.store(true, Ordering::SeqCst);
54            })
55            .expect("Error setting Ctrl-C handler");
56        }
57
58        let started = Arc::new(AtomicBool::new(false));
59        let started2 = started.clone();
60
61        let dir = aeron_context.get_dir().to_string();
62        info!("Starting media driver [dir={}]", dir);
63        let handle = std::thread::spawn(move || {
64            let aeron_context = aeron_context.clone();
65            let aeron_driver = AeronDriver::new(&aeron_context)?;
66            aeron_driver.start(true)?;
67
68            info!(
69                "Aeron driver started [dir={}]",
70                aeron_driver.context().get_dir()
71            );
72
73            started2.store(true, Ordering::SeqCst);
74
75            // Poll for work until Ctrl+C is pressed
76            while !stop.load(Ordering::Acquire) {
77                while aeron_driver.main_do_work()? > 0 {
78                    // busy spin
79                }
80            }
81
82            info!("stopping media driver");
83
84            Ok::<_, AeronCError>(())
85        });
86
87        while !started.load(Ordering::SeqCst) && !handle.is_finished() {
88            sleep(Duration::from_millis(100));
89        }
90
91        if handle.is_finished() {
92            panic!("failed to start media driver {:?}", handle.join())
93        }
94        info!("started media driver [dir={}]", dir);
95
96        (stop_copy, handle)
97    }
98
99    /// if you have existing shm files and its before the driver timeout it will try to reuse it and fail
100    /// this makes sure that if that is the case it will wait else it proceeds
101    pub fn wait_for_previous_media_driver_to_timeout(aeron_context: &AeronDriverContext) {
102        if !aeron_context.get_dir_delete_on_start() {
103            let cnc_file = Path::new(aeron_context.get_dir()).join("cnc.dat");
104
105            if cnc_file.exists() {
106                let timeout = Duration::from_millis(aeron_context.get_driver_timeout_ms() * 2)
107                    .as_nanos() as i64;
108
109                let mut duration = timeout;
110
111                if let Ok(md) = cnc_file.metadata() {
112                    if let Ok(modified_time) = md.modified() {
113                        if let Ok(took) = modified_time.elapsed() {
114                            duration = took.as_nanos() as i64;
115                        }
116                    }
117                }
118
119                let delay = timeout - duration;
120
121                if delay > 0 {
122                    let sleep_duration = Duration::from_nanos((delay + 1_000_000) as u64);
123                    info!("cnc file already exists, will need to wait {sleep_duration:?} for timeout [file={cnc_file:?}]");
124                    sleep(sleep_duration);
125                }
126            }
127        }
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use log::error;
135    use std::os::raw::c_int;
136    use std::sync::atomic::Ordering;
137    use std::time::Duration;
138
139    #[test]
140    fn version_check() {
141        let major = unsafe { crate::aeron_version_major() };
142        let minor = unsafe { crate::aeron_version_minor() };
143        let patch = unsafe { crate::aeron_version_patch() };
144
145        let aeron_version = format!("{}.{}.{}", major, minor, patch);
146        let cargo_version = "1.48.6";
147        assert_eq!(aeron_version, cargo_version);
148    }
149
150    #[test]
151    fn send_message() -> Result<(), AeronCError> {
152        let _ = env_logger::Builder::new()
153            .is_test(true)
154            .filter_level(log::LevelFilter::Info)
155            .try_init();
156        let topic = AERON_IPC_STREAM;
157        let stream_id = 32;
158
159        let aeron_context = AeronDriverContext::new()?;
160        aeron_context.set_dir_delete_on_shutdown(true)?;
161        aeron_context.set_dir_delete_on_start(true)?;
162
163        let (stop, _driver_handle) = AeronDriver::launch_embedded(aeron_context.clone(), false);
164
165        // aeron_driver
166        //     .conductor()
167        //     .context()
168        //     .print_configuration();
169        // aeron_driver.main_do_work()?;
170        info!("aeron dir: {:?}", aeron_context.get_dir());
171
172        let dir = aeron_context.get_dir().to_string();
173        let ctx = AeronContext::new()?;
174        ctx.set_dir(&dir.into_c_string())?;
175
176        let client = Aeron::new(&ctx)?;
177
178        #[derive(Default, Debug)]
179        struct ErrorCount {
180            error_count: usize,
181        }
182
183        impl AeronErrorHandlerCallback for ErrorCount {
184            fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
185                error!("Aeron error {}: {}", error_code, msg);
186                self.error_count += 1;
187            }
188        }
189
190        let error_handler = Some(Handler::leak(ErrorCount::default()));
191        ctx.set_error_handler(error_handler.as_ref())?;
192
193        struct Test {}
194        impl AeronAvailableCounterCallback for Test {
195            fn handle_aeron_on_available_counter(
196                &mut self,
197                counters_reader: AeronCountersReader,
198                registration_id: i64,
199                counter_id: i32,
200            ) -> () {
201                info!("new counter counters_reader={counters_reader:?} registration_id={registration_id} counter_id={counter_id}");
202            }
203        }
204
205        impl AeronNewPublicationCallback for Test {
206            fn handle_aeron_on_new_publication(
207                &mut self,
208                async_: AeronAsyncAddPublication,
209                channel: &str,
210                stream_id: i32,
211                session_id: i32,
212                correlation_id: i64,
213            ) -> () {
214                info!("on new publication {async_:?} {channel} {stream_id} {session_id} {correlation_id}")
215            }
216        }
217        let handler = Some(Handler::leak(Test {}));
218        ctx.set_on_available_counter(handler.as_ref())?;
219        ctx.set_on_new_publication(handler.as_ref())?;
220
221        client.start()?;
222        info!("aeron driver started");
223        assert!(Aeron::epoch_clock() > 0);
224        assert!(Aeron::nano_clock() > 0);
225
226        let counter_async =
227            AeronAsyncAddCounter::new(&client, 2543543, "12312312".as_bytes(), "abcd")?;
228
229        let counter = counter_async.poll_blocking(Duration::from_secs(15))?;
230        unsafe {
231            *counter.addr() += 1;
232        }
233
234        let result = AeronAsyncAddPublication::new(&client, topic, stream_id)?;
235
236        let publication = result.poll_blocking(std::time::Duration::from_secs(15))?;
237
238        info!("publication channel: {:?}", publication.channel());
239        info!("publication stream_id: {:?}", publication.stream_id());
240        info!("publication status: {:?}", publication.channel_status());
241
242        // client.main_do_work();
243        // let claim = AeronBufferClaim::default();
244        // assert!(publication.try_claim(100, &claim) > 0, "publication claim is empty");
245
246        stop.store(true, Ordering::SeqCst);
247
248        Ok(())
249    }
250
251    #[test]
252    pub fn test_debug() -> Result<(), Box<dyn std::error::Error>> {
253        let ctx = AeronDriverContext::new()?;
254
255        println!("{:#?}", ctx);
256
257        struct AgentStartHandler {
258            ctx: AeronDriverContext,
259        }
260
261        impl AeronAgentStartFuncCallback for AgentStartHandler {
262            fn handle_aeron_agent_on_start_func(&mut self, role: &str) -> () {
263                unsafe {
264                    aeron_set_thread_affinity_on_start(
265                        self.ctx.get_inner() as *mut _,
266                        std::ffi::CString::new(role).unwrap().into_raw(),
267                    );
268                }
269            }
270        }
271
272        ctx.set_agent_on_start_function(Some(&Handler::leak(AgentStartHandler {
273            ctx: ctx.clone(),
274        })))?;
275
276        println!("{:#?}", ctx);
277
278        Ok(())
279    }
280}