rusteron_media_driver/
lib.rs1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use log::info;
26use std::path::Path;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::Arc;
29use std::thread::{sleep, JoinHandle};
30use std::time::Duration;
31
32include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
33include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
34
35unsafe impl Sync for AeronDriverContext {}
36unsafe impl Send for AeronDriverContext {}
37unsafe impl Sync for AeronDriver {}
38unsafe impl Send for AeronDriver {}
39
40impl AeronDriver {
41 pub fn launch_embedded(
42 aeron_context: AeronDriverContext,
43 register_sigint: bool,
44 ) -> (Arc<AtomicBool>, JoinHandle<Result<(), AeronCError>>) {
45 AeronDriver::wait_for_previous_media_driver_to_timeout(&aeron_context);
46
47 let stop = Arc::new(AtomicBool::new(false));
48 let stop_copy = stop.clone();
49 if register_sigint {
51 let stop_copy2 = stop.clone();
52 ctrlc::set_handler(move || {
53 stop_copy2.store(true, Ordering::SeqCst);
54 })
55 .expect("Error setting Ctrl-C handler");
56 }
57
58 let started = Arc::new(AtomicBool::new(false));
59 let started2 = started.clone();
60
61 let dir = aeron_context.get_dir().to_string();
62 info!("Starting media driver [dir={}]", dir);
63 let handle = std::thread::spawn(move || {
64 let aeron_context = aeron_context.clone();
65 let aeron_driver = AeronDriver::new(&aeron_context)?;
66 aeron_driver.start(true)?;
67
68 info!(
69 "Aeron driver started [dir={}]",
70 aeron_driver.context().get_dir()
71 );
72
73 started2.store(true, Ordering::SeqCst);
74
75 while !stop.load(Ordering::Acquire) {
77 aeron_driver.main_idle_strategy(aeron_driver.main_do_work()?);
78 }
79
80 info!("stopping media driver");
81
82 Ok::<_, AeronCError>(())
83 });
84
85 while !started.load(Ordering::SeqCst) && !handle.is_finished() {
86 sleep(Duration::from_millis(100));
87 }
88
89 if handle.is_finished() {
90 panic!("failed to start media driver {:?}", handle.join())
91 }
92 info!("started media driver [dir={}]", dir);
93
94 (stop_copy, handle)
95 }
96
97 pub fn wait_for_previous_media_driver_to_timeout(aeron_context: &AeronDriverContext) {
100 if !aeron_context.get_dir_delete_on_start() {
101 let cnc_file = Path::new(aeron_context.get_dir()).join("cnc.dat");
102
103 if cnc_file.exists() {
104 let timeout = Duration::from_millis(aeron_context.get_driver_timeout_ms() * 2)
105 .as_nanos() as i64;
106
107 let mut duration = timeout;
108
109 if let Ok(md) = cnc_file.metadata() {
110 if let Ok(modified_time) = md.modified() {
111 if let Ok(took) = modified_time.elapsed() {
112 duration = took.as_nanos() as i64;
113 }
114 }
115 }
116
117 let delay = timeout - duration;
118
119 if delay > 0 {
120 let sleep_duration = Duration::from_nanos((delay + 1_000_000) as u64);
121 info!("cnc file already exists, will need to wait {sleep_duration:?} for timeout [file={cnc_file:?}]");
122 sleep(sleep_duration);
123 }
124 }
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use log::error;
133 use std::os::raw::c_int;
134 use std::sync::atomic::Ordering;
135 use std::time::Duration;
136
137 #[test]
138 fn version_check() {
139 let major = unsafe { crate::aeron_version_major() };
140 let minor = unsafe { crate::aeron_version_minor() };
141 let patch = unsafe { crate::aeron_version_patch() };
142
143 let aeron_version = format!("{}.{}.{}", major, minor, patch);
144 let cargo_version = "1.50.2";
145 assert_eq!(aeron_version, cargo_version);
146 }
147
148 #[test]
149 fn send_message() -> Result<(), AeronCError> {
150 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
151 let topic = AERON_IPC_STREAM;
152 let stream_id = 32;
153
154 let aeron_context = AeronDriverContext::new()?;
155 aeron_context.set_dir_delete_on_shutdown(true)?;
156 aeron_context.set_dir_delete_on_start(true)?;
157
158 let (stop, _driver_handle) = AeronDriver::launch_embedded(aeron_context.clone(), false);
159
160 info!("aeron dir: {:?}", aeron_context.get_dir());
166
167 let dir = aeron_context.get_dir().to_string();
168 let ctx = AeronContext::new()?;
169 ctx.set_dir(&dir.into_c_string())?;
170
171 let client = Aeron::new(&ctx)?;
172
173 #[derive(Default, Debug)]
174 struct ErrorCount {
175 error_count: usize,
176 }
177
178 impl AeronErrorHandlerCallback for ErrorCount {
179 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
180 error!("Aeron error {}: {}", error_code, msg);
181 self.error_count += 1;
182 }
183 }
184
185 let mut error_handler = Handler::leak(ErrorCount::default());
186 ctx.set_error_handler(Some(&error_handler))?;
187
188 struct Test {}
189 impl AeronAvailableCounterCallback for Test {
190 fn handle_aeron_on_available_counter(
191 &mut self,
192 counters_reader: AeronCountersReader,
193 registration_id: i64,
194 counter_id: i32,
195 ) -> () {
196 info!("new counter counters_reader={counters_reader:?} registration_id={registration_id} counter_id={counter_id}");
197 }
198 }
199
200 impl AeronNewPublicationCallback for Test {
201 fn handle_aeron_on_new_publication(
202 &mut self,
203 async_: AeronAsyncAddPublication,
204 channel: &str,
205 stream_id: i32,
206 session_id: i32,
207 correlation_id: i64,
208 ) -> () {
209 info!("on new publication {async_:?} {channel} {stream_id} {session_id} {correlation_id}")
210 }
211 }
212 let mut handler = Handler::leak(Test {});
213 ctx.set_on_available_counter(Some(&handler))?;
214 ctx.set_on_new_publication(Some(&handler))?;
215
216 client.start()?;
217 info!("aeron driver started");
218 assert!(Aeron::epoch_clock() > 0);
219 assert!(Aeron::nano_clock() > 0);
220
221 let counter_async =
222 AeronAsyncAddCounter::new(&client, 2543543, "12312312".as_bytes(), "abcd")?;
223
224 let counter = counter_async.poll_blocking(Duration::from_secs(15))?;
225 unsafe {
226 *counter.addr() += 1;
227 }
228
229 let result = AeronAsyncAddPublication::new(&client, topic, stream_id)?;
230
231 let publication = result.poll_blocking(std::time::Duration::from_secs(15))?;
232
233 info!("publication channel: {:?}", publication.channel());
234 info!("publication stream_id: {:?}", publication.stream_id());
235 info!("publication status: {:?}", publication.channel_status());
236
237 drop(publication);
238 drop(counter);
239 drop(client);
240 stop.store(true, Ordering::SeqCst);
241 error_handler.release();
242 handler.release();
243
244 Ok(())
245 }
246
247 #[test]
248 pub fn test_debug() -> Result<(), Box<dyn std::error::Error>> {
249 let ctx = AeronDriverContext::new()?;
250
251 println!("{:#?}", ctx);
252
253 struct AgentStartHandler {
254 ctx: AeronDriverContext,
255 }
256
257 impl AeronAgentStartFuncCallback for AgentStartHandler {
258 fn handle_aeron_agent_on_start_func(&mut self, role: &str) -> () {
259 unsafe {
260 aeron_set_thread_affinity_on_start(
261 self.ctx.get_inner() as *mut _,
262 std::ffi::CString::new(role).unwrap().into_raw(),
263 );
264 }
265 }
266 }
267
268 let mut agent_handler = Handler::leak(AgentStartHandler { ctx: ctx.clone() });
269 ctx.set_agent_on_start_function(Some(&agent_handler))?;
270
271 println!("{:#?}", ctx);
272
273 agent_handler.release();
274 Ok(())
275 }
276}