rusteron_media_driver/
lib.rs1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use log::info;
26use std::path::Path;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29use std::thread::{sleep, JoinHandle};
30use std::time::Duration;
31
32include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
33include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
34
35unsafe impl Sync for AeronDriverContext {}
36unsafe impl Send for AeronDriverContext {}
37unsafe impl Sync for AeronDriver {}
38unsafe impl Send for AeronDriver {}
39
40impl AeronDriver {
41 pub fn launch_embedded(
42 aeron_context: AeronDriverContext,
43 register_sigint: bool,
44 ) -> (Arc<AtomicBool>, JoinHandle<Result<(), AeronCError>>) {
45 AeronDriver::wait_for_previous_media_driver_to_timeout(&aeron_context);
46
47 let stop = Arc::new(AtomicBool::new(false));
48 let stop_copy = stop.clone();
49 if register_sigint {
51 let stop_copy2 = stop.clone();
52 ctrlc::set_handler(move || {
53 stop_copy2.store(true, Ordering::SeqCst);
54 })
55 .expect("Error setting Ctrl-C handler");
56 }
57
58 let started = Arc::new(AtomicBool::new(false));
59 let started2 = started.clone();
60
61 let dir = aeron_context.get_dir().to_string();
62 info!("Starting media driver [dir={}]", dir);
63 let handle = std::thread::spawn(move || {
64 let aeron_context = aeron_context.clone();
65 let aeron_driver = AeronDriver::new(&aeron_context)?;
66 aeron_driver.start(true)?;
67
68 info!(
69 "Aeron driver started [dir={}]",
70 aeron_driver.context().get_dir()
71 );
72
73 started2.store(true, Ordering::SeqCst);
74
75 while !stop.load(Ordering::Acquire) {
77 while aeron_driver.main_do_work()? > 0 {
78 }
80 }
81
82 info!("stopping media driver");
83
84 Ok::<_, AeronCError>(())
85 });
86
87 while !started.load(Ordering::SeqCst) && !handle.is_finished() {
88 sleep(Duration::from_millis(100));
89 }
90
91 if handle.is_finished() {
92 panic!("failed to start media driver {:?}", handle.join())
93 }
94 info!("started media driver [dir={}]", dir);
95
96 (stop_copy, handle)
97 }
98
99 pub fn wait_for_previous_media_driver_to_timeout(aeron_context: &AeronDriverContext) {
102 if !aeron_context.get_dir_delete_on_start() {
103 let cnc_file = Path::new(aeron_context.get_dir()).join("cnc.dat");
104
105 if cnc_file.exists() {
106 let timeout = Duration::from_millis(aeron_context.get_driver_timeout_ms() * 2)
107 .as_nanos() as i64;
108
109 let mut duration = timeout;
110
111 if let Ok(md) = cnc_file.metadata() {
112 if let Ok(modified_time) = md.modified() {
113 if let Ok(took) = modified_time.elapsed() {
114 duration = took.as_nanos() as i64;
115 }
116 }
117 }
118
119 let delay = timeout - duration;
120
121 if delay > 0 {
122 let sleep_duration = Duration::from_nanos((delay + 1_000_000) as u64);
123 info!("cnc file already exists, will need to wait {sleep_duration:?} for timeout [file={cnc_file:?}]");
124 sleep(sleep_duration);
125 }
126 }
127 }
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use log::error;
135 use std::os::raw::c_int;
136 use std::sync::atomic::Ordering;
137 use std::time::Duration;
138
139 #[test]
140 fn version_check() {
141 let major = unsafe { crate::aeron_version_major() };
142 let minor = unsafe { crate::aeron_version_minor() };
143 let patch = unsafe { crate::aeron_version_patch() };
144
145 let aeron_version = format!("{}.{}.{}", major, minor, patch);
146 let cargo_version = "1.48.6";
147 assert_eq!(aeron_version, cargo_version);
148 }
149
150 #[test]
151 fn send_message() -> Result<(), AeronCError> {
152 let _ = env_logger::Builder::new()
153 .is_test(true)
154 .filter_level(log::LevelFilter::Info)
155 .try_init();
156 let topic = AERON_IPC_STREAM;
157 let stream_id = 32;
158
159 let aeron_context = AeronDriverContext::new()?;
160 aeron_context.set_dir_delete_on_shutdown(true)?;
161 aeron_context.set_dir_delete_on_start(true)?;
162
163 let (stop, _driver_handle) = AeronDriver::launch_embedded(aeron_context.clone(), false);
164
165 info!("aeron dir: {:?}", aeron_context.get_dir());
171
172 let dir = aeron_context.get_dir().to_string();
173 let ctx = AeronContext::new()?;
174 ctx.set_dir(&dir.into_c_string())?;
175
176 let client = Aeron::new(&ctx)?;
177
178 #[derive(Default, Debug)]
179 struct ErrorCount {
180 error_count: usize,
181 }
182
183 impl AeronErrorHandlerCallback for ErrorCount {
184 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
185 error!("Aeron error {}: {}", error_code, msg);
186 self.error_count += 1;
187 }
188 }
189
190 let error_handler = Some(Handler::leak(ErrorCount::default()));
191 ctx.set_error_handler(error_handler.as_ref())?;
192
193 struct Test {}
194 impl AeronAvailableCounterCallback for Test {
195 fn handle_aeron_on_available_counter(
196 &mut self,
197 counters_reader: AeronCountersReader,
198 registration_id: i64,
199 counter_id: i32,
200 ) -> () {
201 info!("new counter counters_reader={counters_reader:?} registration_id={registration_id} counter_id={counter_id}");
202 }
203 }
204
205 impl AeronNewPublicationCallback for Test {
206 fn handle_aeron_on_new_publication(
207 &mut self,
208 async_: AeronAsyncAddPublication,
209 channel: &str,
210 stream_id: i32,
211 session_id: i32,
212 correlation_id: i64,
213 ) -> () {
214 info!("on new publication {async_:?} {channel} {stream_id} {session_id} {correlation_id}")
215 }
216 }
217 let handler = Some(Handler::leak(Test {}));
218 ctx.set_on_available_counter(handler.as_ref())?;
219 ctx.set_on_new_publication(handler.as_ref())?;
220
221 client.start()?;
222 info!("aeron driver started");
223 assert!(Aeron::epoch_clock() > 0);
224 assert!(Aeron::nano_clock() > 0);
225
226 let counter_async =
227 AeronAsyncAddCounter::new(&client, 2543543, "12312312".as_bytes(), "abcd")?;
228
229 let counter = counter_async.poll_blocking(Duration::from_secs(15))?;
230 unsafe {
231 *counter.addr() += 1;
232 }
233
234 let result = AeronAsyncAddPublication::new(&client, topic, stream_id)?;
235
236 let publication = result.poll_blocking(std::time::Duration::from_secs(15))?;
237
238 info!("publication channel: {:?}", publication.channel());
239 info!("publication stream_id: {:?}", publication.stream_id());
240 info!("publication status: {:?}", publication.channel_status());
241
242 stop.store(true, Ordering::SeqCst);
247
248 Ok(())
249 }
250
251 #[test]
252 pub fn test_debug() -> Result<(), Box<dyn std::error::Error>> {
253 let ctx = AeronDriverContext::new()?;
254
255 println!("{:#?}", ctx);
256
257 struct AgentStartHandler {
258 ctx: AeronDriverContext,
259 }
260
261 impl AeronAgentStartFuncCallback for AgentStartHandler {
262 fn handle_aeron_agent_on_start_func(&mut self, role: &str) -> () {
263 unsafe {
264 aeron_set_thread_affinity_on_start(
265 self.ctx.get_inner() as *mut _,
266 std::ffi::CString::new(role).unwrap().into_raw(),
267 );
268 }
269 }
270 }
271
272 ctx.set_agent_on_start_function(Some(&Handler::leak(AgentStartHandler {
273 ctx: ctx.clone(),
274 })))?;
275
276 println!("{:#?}", ctx);
277
278 Ok(())
279 }
280}