rusteron_client/
lib.rs

1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25
26include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
27include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
28
29#[cfg(test)]
30mod tests {
31    use super::*;
32    use crate::test_alloc::current_allocs;
33    use hdrhistogram::Histogram;
34    use log::{error, info};
35    use rusteron_media_driver::AeronDriverContext;
36    use serial_test::serial;
37    use std::error;
38    use std::error::Error;
39    use std::io::Write;
40    use std::os::raw::c_int;
41    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
42    use std::sync::Arc;
43    use std::thread::{sleep, JoinHandle};
44    use std::time::{Duration, Instant};
45
46    #[derive(Default, Debug)]
47    struct ErrorCount {
48        error_count: usize,
49    }
50
51    impl AeronErrorHandlerCallback for ErrorCount {
52        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
53            error!("Aeron error {}: {}", error_code, msg);
54            self.error_count += 1;
55        }
56    }
57
58    fn running_under_valgrind() -> bool {
59        std::env::var_os("RUSTERON_VALGRIND").is_some()
60    }
61
62    #[test]
63    #[serial]
64    fn version_check() -> Result<(), Box<dyn error::Error>> {
65        unsafe {
66            aeron_randomised_int32();
67        }
68        let alloc_count = current_allocs();
69
70        {
71            let major = unsafe { crate::aeron_version_major() };
72            let minor = unsafe { crate::aeron_version_minor() };
73            let patch = unsafe { crate::aeron_version_patch() };
74
75            let cargo_version = "1.50.2";
76            let aeron_version = format!("{}.{}.{}", major, minor, patch);
77            assert_eq!(aeron_version, cargo_version);
78
79            let ctx = AeronContext::new()?;
80            let error_count = 1;
81            let mut handler = Handler::leak(ErrorCount::default());
82            ctx.set_error_handler(Some(&handler))?;
83
84            assert!(Aeron::epoch_clock() > 0);
85            drop(ctx);
86            assert!(handler.should_drop);
87            handler.release();
88            assert!(!handler.should_drop);
89            drop(handler);
90        }
91
92        assert!(
93            current_allocs() <= alloc_count,
94            "allocations {} > {alloc_count}",
95            current_allocs()
96        );
97
98        Ok(())
99    }
100
101    #[test]
102    #[serial]
103    fn async_publication_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
104        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
105
106        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
107        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
108        media_driver_ctx.set_dir_delete_on_start(true)?;
109        media_driver_ctx.set_dir(
110            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
111        )?;
112        let (stop, driver_handle) =
113            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
114
115        let ctx = AeronContext::new()?;
116        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
117        let mut error_handler = Handler::leak(ErrorCount::default());
118        ctx.set_error_handler(Some(&error_handler))?;
119        let aeron = Aeron::new(&ctx)?;
120        aeron.start()?;
121
122        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54321");
123
124        // Create async publication and subscription pollers on the same invalid channel and
125        // attempt to resolve them. If both are created, try a small send/receive cycle and then exit.
126        let pub_poller = aeron.async_add_publication(&channel.clone().into_c_string(), 4321)?;
127        let sub_poller = aeron.async_add_subscription(
128            &channel.into_c_string(),
129            4321,
130            Handlers::no_available_image_handler(),
131            Handlers::no_unavailable_image_handler(),
132        )?;
133
134        let mut publication: Option<AeronPublication> = None;
135        let mut subscription: Option<AeronSubscription> = None;
136        let start = Instant::now();
137        while start.elapsed() < Duration::from_secs(2) {
138            if publication.is_none() {
139                match pub_poller.poll() {
140                    Ok(Some(p)) => publication = Some(p),
141                    Ok(None) | Err(_) => {}
142                }
143            }
144            if subscription.is_none() {
145                match sub_poller.poll() {
146                    Ok(Some(s)) => subscription = Some(s),
147                    Ok(None) | Err(_) => {}
148                }
149            }
150            if publication.is_some() && subscription.is_some() {
151                break;
152            }
153            #[cfg(debug_assertions)]
154            std::thread::sleep(Duration::from_millis(10));
155        }
156
157        info!("publication: {:?}", publication);
158        info!("subscription: {:?}", subscription);
159
160        if let (Some(publisher), Some(subscription)) = (publication, subscription) {
161            let payload = b"hello-aeron";
162            let send_start = Instant::now();
163            let mut sent = false;
164            while send_start.elapsed() < Duration::from_millis(500) {
165                let res = publisher.offer(payload, Handlers::no_reserved_value_supplier_handler());
166                if res >= payload.len() as i64 {
167                    sent = true;
168                    info!("sent {:?}", payload);
169                    break;
170                }
171                std::thread::sleep(Duration::from_millis(10));
172            }
173
174            if sent {
175                let mut got = false;
176                let read_start = Instant::now();
177                while read_start.elapsed() < Duration::from_millis(500) {
178                    let _ = subscription.poll_once(
179                        |msg, _hdr| {
180                            if msg == payload {
181                                got = true;
182                                info!("received {:?}", payload);
183                            }
184                        },
185                        1024,
186                    );
187                    if got {
188                        break;
189                    }
190                    std::thread::sleep(Duration::from_millis(10));
191                }
192                // We don't assert on got, just exercise the path.
193            }
194        }
195
196        // Shutdown
197        drop(aeron);
198        stop.store(true, Ordering::SeqCst);
199        let _ = driver_handle.join().unwrap();
200        error_handler.release();
201        Ok(())
202    }
203
204    #[test]
205    #[serial]
206    fn async_pub_sub_invalid_endpoint_create_drop_stress() -> Result<(), Box<dyn error::Error>> {
207        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
208
209        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
210        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
211        media_driver_ctx.set_dir_delete_on_start(true)?;
212        media_driver_ctx.set_dir(
213            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
214        )?;
215        let (stop, driver_handle) =
216            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
217
218        let ctx = AeronContext::new()?;
219        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
220        let mut error_handler = Handler::leak(ErrorCount::default());
221        ctx.set_error_handler(Some(&error_handler))?;
222        let aeron = Aeron::new(&ctx)?;
223        aeron.start()?;
224
225        const STRESS_ITERS: u16 = 60;
226        const POLL_TIMEOUT: Duration = Duration::from_secs(10);
227        const POLL_SLEEP: Duration = Duration::from_millis(10);
228
229        // Stress: repeatedly create async pub/sub on an invalid endpoint and drive each
230        // poller to a terminal state before dropping it.
231        for i in 0..STRESS_ITERS {
232            let port = 55000u16 + i;
233            let channel = format!("aeron:udp?endpoint=203.0.113.1:{}", port);
234            let pub_poller =
235                aeron.async_add_publication(&channel.clone().into_c_string(), 4500 + i as i32)?;
236            let sub_poller = aeron.async_add_subscription(
237                &channel.into_c_string(),
238                4500 + i as i32,
239                Handlers::no_available_image_handler(),
240                Handlers::no_unavailable_image_handler(),
241            )?;
242
243            let start = Instant::now();
244            let mut publication = None;
245            let mut publication_done = false;
246            let mut subscription = None;
247            let mut subscription_done = false;
248
249            while !(publication_done && subscription_done) && start.elapsed() < POLL_TIMEOUT {
250                if !publication_done {
251                    match pub_poller.poll() {
252                        Ok(Some(pub_)) => {
253                            publication = Some(pub_);
254                            publication_done = true;
255                        }
256                        Ok(None) => {}
257                        Err(err) => {
258                            info!("publication async add finished with error on iteration {i}: {err:?}");
259                            publication_done = true;
260                        }
261                    }
262                }
263
264                if !subscription_done {
265                    match sub_poller.poll() {
266                        Ok(Some(sub_)) => {
267                            subscription = Some(sub_);
268                            subscription_done = true;
269                        }
270                        Ok(None) => {}
271                        Err(err) => {
272                            info!("subscription async add finished with error on iteration {i}: {err:?}");
273                            subscription_done = true;
274                        }
275                    }
276                }
277
278                if !(publication_done && subscription_done) {
279                    std::thread::sleep(POLL_SLEEP);
280                }
281            }
282
283            assert!(
284                publication_done,
285                "publication async add did not complete on iteration {i} within {:?}",
286                POLL_TIMEOUT
287            );
288            assert!(
289                subscription_done,
290                "subscription async add did not complete on iteration {i} within {:?}",
291                POLL_TIMEOUT
292            );
293
294            drop(subscription);
295            drop(publication);
296            drop(sub_poller);
297            drop(pub_poller);
298        }
299
300        drop(aeron);
301        stop.store(true, Ordering::SeqCst);
302        let _ = driver_handle.join().unwrap();
303        error_handler.release();
304        Ok(())
305    }
306
307    #[test]
308    #[serial]
309    // // #[ignore] // TODO FIXME broken test
310    fn async_subscription_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
311        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
312
313        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
314        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
315        media_driver_ctx.set_dir_delete_on_start(true)?;
316        media_driver_ctx.set_dir(
317            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
318        )?;
319        let (stop, driver_handle) =
320            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
321
322        let ctx = AeronContext::new()?;
323        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
324        let mut error_handler = Handler::leak(ErrorCount::default());
325        ctx.set_error_handler(Some(&error_handler))?;
326        let aeron = Aeron::new(&ctx)?;
327        aeron.start()?;
328
329        // Invalid remote endpoint only (no interface)
330        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54323");
331
332        let poller = aeron.async_add_subscription(
333            &channel.into_c_string(),
334            4323,
335            Handlers::no_available_image_handler(),
336            Handlers::no_unavailable_image_handler(),
337        )?;
338
339        let start = Instant::now();
340        while start.elapsed() < Duration::from_millis(250) {
341            let _ = poller.poll();
342            #[cfg(debug_assertions)]
343            std::thread::sleep(Duration::from_millis(10));
344        }
345
346        drop(poller);
347        drop(aeron);
348        stop.store(true, Ordering::SeqCst);
349        let _ = driver_handle.join().unwrap();
350        error_handler.release();
351        Ok(())
352    }
353
354    #[test]
355    #[serial]
356    fn blocking_add_subscription_invalid_interface_timeout() -> Result<(), Box<dyn error::Error>> {
357        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
358
359        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
360        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
361        media_driver_ctx.set_dir_delete_on_start(true)?;
362        media_driver_ctx.set_dir(
363            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
364        )?;
365        let (stop, driver_handle) =
366            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
367
368        let ctx = AeronContext::new()?;
369        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
370        let mut error_handler = Handler::leak(ErrorCount::default());
371        ctx.set_error_handler(Some(&error_handler))?;
372        let aeron = Aeron::new(&ctx)?;
373        aeron.start()?;
374
375        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54324");
376
377        let result = aeron.add_subscription(
378            &channel.into_c_string(),
379            4324,
380            Handlers::no_available_image_handler(),
381            Handlers::no_unavailable_image_handler(),
382            Duration::from_millis(300),
383        );
384
385        assert!(result.is_err(), "expected error for invalid interface");
386        drop(aeron);
387        stop.store(true, Ordering::SeqCst);
388        let _ = driver_handle.join().unwrap();
389        error_handler.release();
390        Ok(())
391    }
392
393    #[test]
394    #[serial]
395    fn async_publication_invalid_bind_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
396        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
397
398        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
399        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
400        media_driver_ctx.set_dir_delete_on_start(true)?;
401        media_driver_ctx.set_dir(
402            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
403        )?;
404        let (stop, driver_handle) =
405            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
406
407        let ctx = AeronContext::new()?;
408        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
409        let mut error_handler = Handler::leak(ErrorCount::default());
410        ctx.set_error_handler(Some(&error_handler))?;
411        let aeron = Aeron::new(&ctx)?;
412        aeron.start()?;
413
414        // Use an invalid bind on publication (bind is not valid for publication, and the IP is unowned).
415        let channel = format!("aeron:udp?endpoint=127.0.0.1:54330|bind=203.0.113.1:60000");
416
417        let poller = aeron.async_add_publication(&channel.into_c_string(), 4330)?;
418        let start = Instant::now();
419        while start.elapsed() < Duration::from_millis(250) {
420            let _ = poller.poll();
421            #[cfg(debug_assertions)]
422            std::thread::sleep(Duration::from_millis(10));
423        }
424        drop(poller);
425        drop(aeron);
426        stop.store(true, Ordering::SeqCst);
427        let _ = driver_handle.join().unwrap();
428        error_handler.release();
429        Ok(())
430    }
431
432    #[test]
433    #[serial]
434    pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
435        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
436        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
437        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
438        media_driver_ctx.set_dir_delete_on_start(true)?;
439        media_driver_ctx.set_dir(
440            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
441        )?;
442        // Under Valgrind execution is ~10x slower; increase liveness timeouts so the
443        // driver doesn't evict the client before the test finishes.
444        media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; // 60 s
445        media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; // 60 s
446        media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; // 65 s
447        media_driver_ctx.set_driver_timeout_ms(60_000)?; // 60 s
448        let (stop, driver_handle) =
449            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
450
451        let ctx = AeronContext::new()?;
452        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
453        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
454        // Keep client-side keepalive threshold aligned with the slow Valgrind environment.
455        ctx.set_driver_timeout_ms(60_000)?;
456        // Store all handlers so we can call release() after Aeron is stopped.
457        // Anonymous Handler::leak() temporaries would be dropped without release(),
458        // triggering the Drop impl warning and leaving heap allocations permanently orphaned.
459        let mut error_handler = Handler::leak(ErrorCount::default());
460        let mut new_pub_handler = Handler::leak(AeronNewPublicationLogger);
461        let mut avail_counter_handler1 = Handler::leak(AeronAvailableCounterLogger);
462        let mut close_client_handler = Handler::leak(AeronCloseClientLogger);
463        let mut new_sub_handler = Handler::leak(AeronNewSubscriptionLogger);
464        let mut unavail_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
465        let mut avail_counter_handler2 = Handler::leak(AeronAvailableCounterLogger);
466        let mut excl_pub_handler = Handler::leak(AeronNewPublicationLogger);
467        ctx.set_error_handler(Some(&error_handler))?;
468        ctx.set_on_new_publication(Some(&new_pub_handler))?;
469        ctx.set_on_available_counter(Some(&avail_counter_handler1))?;
470        ctx.set_on_close_client(Some(&close_client_handler))?;
471        ctx.set_on_new_subscription(Some(&new_sub_handler))?;
472        ctx.set_on_unavailable_counter(Some(&unavail_counter_handler))?;
473        ctx.set_on_available_counter(Some(&avail_counter_handler2))?;
474        ctx.set_on_new_exclusive_publication(Some(&excl_pub_handler))?;
475
476        info!("creating client [simple_large_send test]");
477        let aeron = Aeron::new(&ctx)?;
478        info!("starting client");
479
480        aeron.start()?;
481        info!("client started");
482        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
483        info!("created publisher");
484
485        assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
486        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
487        AeronCncMetadata::read_from_file(&cstr, |cnc| {
488            assert!(cnc.pid > 0);
489        })?;
490        assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
491        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
492        for _ in 0..50 {
493            AeronCnc::read_on_partial_stack(&cstr, |cnc| {
494                assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
495            })?;
496        }
497
498        let subscription = aeron.add_subscription(
499            AERON_IPC_STREAM,
500            123,
501            Handlers::no_available_image_handler(),
502            Handlers::no_unavailable_image_handler(),
503            Duration::from_secs(5),
504        )?;
505        info!("created subscription");
506
507        subscription
508            .poll_once(|msg, header| println!("foo"), 1024)
509            .unwrap();
510
511        // pick a large enough size to confirm fragement assembler is working
512        let string_len = media_driver_ctx.ipc_mtu_length * 100;
513        info!("string length: {}", string_len);
514
515        let stop_publisher = Arc::new(AtomicBool::new(false));
516
517        let publisher_handler = {
518            let stop_publisher = stop_publisher.clone();
519            std::thread::spawn(move || {
520                let binding = "1".repeat(string_len);
521                let large_msg = binding.as_bytes();
522                loop {
523                    if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
524                        break;
525                    }
526                    let result =
527                        publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
528
529                    assert_eq!(123, publisher.get_constants().unwrap().stream_id);
530
531                    if result < large_msg.len() as i64 {
532                        let error = AeronCError::from_code(result as i32);
533                        match error.kind() {
534                            AeronErrorType::PublicationBackPressured
535                            | AeronErrorType::PublicationAdminAction => {
536                                // ignore
537                            }
538                            _ => {
539                                error!(
540                                    "ERROR: failed to send message {:?}",
541                                    AeronCError::from_code(result as i32)
542                                );
543                            }
544                        }
545                        sleep(Duration::from_millis(500));
546                    }
547                }
548                info!("stopping publisher thread");
549            })
550        };
551
552        let mut assembler = AeronFragmentClosureAssembler::new()?;
553
554        struct Context {
555            count: Arc<AtomicUsize>,
556            stop: Arc<AtomicBool>,
557            string_len: usize,
558        }
559
560        let count = Arc::new(AtomicUsize::new(0usize));
561        let mut context = Context {
562            count: count.clone(),
563            stop: stop.clone(),
564            string_len,
565        };
566
567        // Start the timer
568        let start_time = Instant::now();
569
570        // Use break-with-value so cleanup (handler release, driver stop) always runs.
571        // 120-second timeout: under Valgrind execution is ~10× slower, so 30 s is too tight.
572        let loop_result: Result<(), Box<dyn error::Error>> = loop {
573            if start_time.elapsed() > Duration::from_secs(120) {
574                info!("Failed: exceeded 120-second timeout");
575                break Err(Box::new(std::io::Error::new(
576                    std::io::ErrorKind::TimedOut,
577                    "Timeout exceeded",
578                )));
579            }
580            let c = count.load(Ordering::SeqCst);
581            if c > 100 {
582                break Ok(());
583            }
584
585            fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
586                ctx.count.fetch_add(1, Ordering::SeqCst);
587
588                let values = header.get_values().unwrap();
589                assert_ne!(values.frame.session_id, 0);
590
591                if buffer.len() != ctx.string_len {
592                    ctx.stop.store(true, Ordering::SeqCst);
593                    error!(
594                        "ERROR: message was {} but was expecting {} [header={:?}]",
595                        buffer.len(),
596                        ctx.string_len,
597                        header
598                    );
599                    sleep(Duration::from_secs(1));
600                }
601
602                assert_eq!(buffer.len(), ctx.string_len);
603                assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
604            }
605
606            subscription.poll(assembler.process(&mut context, process_msg), 128)?;
607            assert_eq!(123, subscription.get_constants().unwrap().stream_id);
608        };
609
610        subscription.close(Handlers::no_notification_handler())?;
611
612        info!("stopping client");
613        stop_publisher.store(true, Ordering::SeqCst);
614
615        let _ = publisher_handler.join().unwrap();
616        drop(subscription);
617        drop(aeron);
618
619        stop.store(true, Ordering::SeqCst);
620        let _ = driver_handle.join().unwrap();
621
622        // Release all context handlers now that Aeron and the driver are fully stopped.
623        error_handler.release();
624        new_pub_handler.release();
625        avail_counter_handler1.release();
626        close_client_handler.release();
627        new_sub_handler.release();
628        unavail_counter_handler.release();
629        avail_counter_handler2.release();
630        excl_pub_handler.release();
631
632        let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
633        cnc.counters_reader().foreach_counter_once(
634            |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
635                println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
636                AeronSystemCounterType::try_from(type_id));
637            },
638        );
639        cnc.error_log_read_once(| observation_count: i32,
640                                     first_observation_timestamp: i64,
641                                     last_observation_timestamp: i64,
642                                     error: &str| {
643            println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
644        }, 0);
645        cnc.loss_reporter_read_once(|    observation_count: i64,
646                                    total_bytes_lost: i64,
647                                    first_observation_timestamp: i64,
648                                    last_observation_timestamp: i64,
649                                    session_id: i32,
650                                    stream_id: i32,
651                                    channel: &str,
652                                    source: &str,| {
653            println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
654        })?;
655
656        loop_result?;
657        Ok(())
658    }
659
660    #[test]
661    #[serial]
662    pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
663        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
664        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
665        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
666        media_driver_ctx.set_dir_delete_on_start(true)?;
667        media_driver_ctx.set_dir(
668            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
669        )?;
670        // Under Valgrind execution is ~10x slower; increase liveness timeouts so the
671        // driver doesn't evict the client before the test finishes.
672        media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; // 60 s
673        media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; // 60 s
674        media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; // 65 s
675        media_driver_ctx.set_driver_timeout_ms(60_000)?; // 60 s
676        let (stop, driver_handle) =
677            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
678
679        let ctx = AeronContext::new()?;
680        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
681        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
682        // Keep client-side keepalive threshold aligned with slow Valgrind environment.
683        ctx.set_driver_timeout_ms(60_000)?;
684        let mut error_handler = Handler::leak(ErrorCount::default());
685        ctx.set_error_handler(Some(&error_handler))?;
686
687        info!("creating client [try_claim test]");
688        let aeron = Aeron::new(&ctx)?;
689        info!("starting client");
690
691        aeron.start()?;
692        info!("client started");
693        const STREAM_ID: i32 = 123;
694        let publisher =
695            aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
696        info!("created publisher");
697
698        let subscription = aeron.add_subscription(
699            AERON_IPC_STREAM,
700            STREAM_ID,
701            Handlers::no_available_image_handler(),
702            Handlers::no_unavailable_image_handler(),
703            Duration::from_secs(5),
704        )?;
705        info!("created subscription");
706
707        // pick a large enough size to confirm fragement assembler is working
708        let string_len = 156;
709        info!("string length: {}", string_len);
710
711        let stop_publisher = Arc::new(AtomicBool::new(false));
712
713        let publisher_handler = {
714            let stop_publisher = stop_publisher.clone();
715            std::thread::spawn(move || {
716                let binding = "1".repeat(string_len);
717                let msg = binding.as_bytes();
718                let buffer = AeronBufferClaim::default();
719                loop {
720                    if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
721                        break;
722                    }
723
724                    let result = publisher.try_claim(string_len, &buffer);
725
726                    if result < msg.len() as i64 {
727                        error!(
728                            "ERROR: failed to send message {:?}",
729                            AeronCError::from_code(result as i32)
730                        );
731                    } else {
732                        buffer.data().write_all(&msg).unwrap();
733                        buffer.commit().unwrap();
734                    }
735                }
736                info!("stopping publisher thread");
737            })
738        };
739
740        let count = Arc::new(AtomicUsize::new(0usize));
741        let count_copy = Arc::clone(&count);
742        let stop2 = stop.clone();
743
744        struct FragmentHandler {
745            count_copy: Arc<AtomicUsize>,
746            stop2: Arc<AtomicBool>,
747            string_len: usize,
748        }
749
750        impl AeronFragmentHandlerCallback for FragmentHandler {
751            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
752                assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
753                let header = header.get_values().unwrap();
754                let frame = header.frame();
755                let stream_id = frame.stream_id();
756                assert_eq!(STREAM_ID, stream_id);
757
758                self.count_copy.fetch_add(1, Ordering::SeqCst);
759
760                if buffer.len() != self.string_len {
761                    self.stop2.store(true, Ordering::SeqCst);
762                    error!(
763                        "ERROR: message was {} but was expecting {} [header={:?}]",
764                        buffer.len(),
765                        self.string_len,
766                        header
767                    );
768                    sleep(Duration::from_secs(1));
769                }
770
771                assert_eq!(buffer.len(), self.string_len);
772                assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
773            }
774        }
775
776        let (mut closure, mut inner_handler) =
777            Handler::leak_with_fragment_assembler(FragmentHandler {
778                count_copy,
779                stop2,
780                string_len,
781            })?;
782        let loop_result: Result<(), Box<dyn error::Error>> = {
783            let start_time = Instant::now();
784            loop {
785                if start_time.elapsed() > Duration::from_secs(120) {
786                    info!("Failed: exceeded 120-second timeout");
787                    break Err(Box::new(std::io::Error::new(
788                        std::io::ErrorKind::TimedOut,
789                        "Timeout exceeded",
790                    )));
791                }
792                let c = count.load(Ordering::SeqCst);
793                if c > 100 {
794                    break Ok(());
795                }
796                subscription.poll(Some(&closure), 128)?;
797            }
798        };
799
800        info!("stopping client");
801
802        stop_publisher.store(true, Ordering::SeqCst);
803
804        let _ = publisher_handler.join().unwrap();
805        drop(subscription);
806        drop(aeron);
807
808        stop.store(true, Ordering::SeqCst);
809        let _ = driver_handle.join().unwrap();
810        closure.release();
811        inner_handler.release();
812        error_handler.release();
813        loop_result?;
814        Ok(())
815    }
816
817    #[test]
818    #[serial]
819    pub fn counters() -> Result<(), Box<dyn error::Error>> {
820        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
821        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
822        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
823        media_driver_ctx.set_dir_delete_on_start(true)?;
824        media_driver_ctx.set_dir(
825            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
826        )?;
827        let (stop, driver_handle) =
828            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
829
830        let ctx = AeronContext::new()?;
831        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
832        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
833        let mut error_handler = Handler::leak(ErrorCount::default());
834        ctx.set_error_handler(Some(&error_handler))?;
835        let mut unavailable_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
836        ctx.set_on_unavailable_counter(Some(&unavailable_counter_handler))?;
837
838        struct AvailableCounterHandler {
839            found_counter: bool,
840        }
841
842        impl AeronAvailableCounterCallback for AvailableCounterHandler {
843            fn handle_aeron_on_available_counter(
844                &mut self,
845                counters_reader: AeronCountersReader,
846                registration_id: i64,
847                counter_id: i32,
848            ) -> () {
849                info!(
850            "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
851            String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
852            counters_reader.get_counter_label(counter_id, 1000),
853            counters_reader.addr(counter_id)
854        );
855
856                assert_eq!(
857                    counters_reader.counter_registration_id(counter_id).unwrap(),
858                    registration_id
859                );
860
861                if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
862                    if label == "label_buffer" {
863                        self.found_counter = true;
864                        assert_eq!(
865                            &counters_reader.get_counter_key(counter_id).unwrap(),
866                            "key".as_bytes()
867                        );
868                    }
869                }
870            }
871        }
872
873        let mut available_counter_handler = Handler::leak(AvailableCounterHandler {
874            found_counter: false,
875        });
876        ctx.set_on_available_counter(Some(&available_counter_handler))?;
877
878        info!("creating client");
879        let aeron = Aeron::new(&ctx)?;
880        info!("starting client");
881
882        aeron.start()?;
883        info!("client started [counters test]");
884
885        let counter = aeron.add_counter(
886            123,
887            "key".as_bytes(),
888            "label_buffer",
889            Duration::from_secs(5),
890        )?;
891        let constants = counter.get_constants()?;
892        let counter_id = constants.counter_id;
893
894        let stop_publisher = Arc::new(AtomicBool::new(false));
895
896        let publisher_handler = {
897            let stop_publisher = stop_publisher.clone();
898            let counter = counter.clone();
899            std::thread::spawn(move || {
900                for _ in 0..150 {
901                    if stop_publisher.load(Ordering::Acquire) || counter.is_closed() {
902                        break;
903                    }
904                    counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
905                }
906                info!("stopping publisher thread");
907            })
908        };
909
910        let now = Instant::now();
911        while counter.addr_atomic().load(Ordering::SeqCst) < 100
912            && now.elapsed() < Duration::from_secs(10)
913        {
914            sleep(Duration::from_micros(10));
915        }
916
917        assert!(now.elapsed() < Duration::from_secs(10));
918
919        info!(
920            "counter is {}",
921            counter.addr_atomic().load(Ordering::SeqCst)
922        );
923
924        info!("stopping client");
925
926        #[cfg(not(target_os = "windows"))] // not sure why windows version doesn't fire event
927        assert!(available_counter_handler.found_counter);
928
929        let reader = aeron.counters_reader();
930        assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
931        assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
932        let buffers = AeronCountersReaderBuffers::default();
933        reader.get_buffers(&buffers)?;
934
935        stop_publisher.store(true, Ordering::SeqCst);
936
937        let _ = publisher_handler.join().unwrap();
938        drop(counter);
939        drop(aeron);
940
941        stop.store(true, Ordering::SeqCst);
942        let _ = driver_handle.join().unwrap();
943        available_counter_handler.release();
944        unavailable_counter_handler.release();
945        error_handler.release();
946        Ok(())
947    }
948
949    /// A simple error counter for testing error callback invocation.
950    #[derive(Default, Debug)]
951    struct TestErrorCount {
952        pub error_count: usize,
953    }
954
955    impl Drop for TestErrorCount {
956        fn drop(&mut self) {
957            info!("TestErrorCount dropped with {} errors", self.error_count);
958        }
959    }
960
961    impl AeronErrorHandlerCallback for TestErrorCount {
962        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
963            error!("Aeron error {}: {}", error_code, msg);
964            self.error_count += 1;
965        }
966    }
967
968    #[test]
969    #[serial]
970    pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
971        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
972
973        let under_valgrind = running_under_valgrind();
974        let driver_timeout_ms = if under_valgrind { 180_000 } else { 60_000 };
975        let liveness_timeout_ns = if under_valgrind {
976            180_000_000_000
977        } else {
978            60_000_000_000
979        };
980        let poll_timeout = Duration::from_millis(driver_timeout_ms as u64);
981
982        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
983        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
984        media_driver_ctx.set_dir_delete_on_start(true)?;
985        media_driver_ctx.set_dir(
986            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
987        )?;
988        media_driver_ctx.set_client_liveness_timeout_ns(liveness_timeout_ns)?;
989        media_driver_ctx.set_image_liveness_timeout_ns(liveness_timeout_ns)?;
990        media_driver_ctx.set_publication_unblock_timeout_ns(liveness_timeout_ns + 5_000_000_000)?;
991        media_driver_ctx.set_driver_timeout_ms(driver_timeout_ms)?;
992        let (stop, driver_handle) =
993            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
994
995        let ctx = AeronContext::new()?;
996        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
997        ctx.set_driver_timeout_ms(driver_timeout_ms)?;
998        let mut error_handler = Handler::leak(TestErrorCount::default());
999        ctx.set_error_handler(Some(&error_handler))?;
1000
1001        let aeron = Aeron::new(&ctx)?;
1002        aeron.start()?;
1003
1004        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1005        let subscription = aeron.add_subscription(
1006            AERON_IPC_STREAM,
1007            123,
1008            Handlers::no_available_image_handler(),
1009            Handlers::no_unavailable_image_handler(),
1010            Duration::from_secs(5),
1011        )?;
1012
1013        let count = Arc::new(AtomicUsize::new(0));
1014        let start_time = Instant::now();
1015
1016        let stop_publisher = Arc::new(AtomicBool::new(false));
1017
1018        // Spawn a publisher thread that repeatedly sends "test" messages.
1019        let publisher_thread = {
1020            let stop_publisher = stop_publisher.clone();
1021            std::thread::spawn(move || {
1022                while !stop_publisher.load(Ordering::Acquire) {
1023                    let msg = b"test";
1024                    let result =
1025                        publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1026                    // If backpressure is encountered, sleep a bit.
1027                    if result == AeronErrorType::PublicationBackPressured.code() as i64 {
1028                        sleep(Duration::from_millis(50));
1029                    }
1030                    if publisher.is_closed() {
1031                        break;
1032                    }
1033                }
1034            })
1035        };
1036
1037        // Poll using the inline closure via poll_once until we receive at least 50 messages.
1038        while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < poll_timeout {
1039            let _ = subscription.poll_once(
1040                |_msg, _header| {
1041                    count.fetch_add(1, Ordering::SeqCst);
1042                },
1043                128,
1044            )?;
1045        }
1046
1047        stop_publisher.store(true, Ordering::SeqCst);
1048        publisher_thread.join().unwrap();
1049        drop(subscription);
1050        drop(aeron);
1051        stop.store(true, Ordering::SeqCst);
1052        let _ = driver_handle.join().unwrap();
1053        error_handler.release();
1054
1055        assert!(
1056            count.load(Ordering::SeqCst) >= 50,
1057            "Expected at least 50 messages received"
1058        );
1059        Ok(())
1060    }
1061
1062    #[test]
1063    #[serial]
1064    pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
1065        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1066
1067        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1068        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1069        media_driver_ctx.set_dir_delete_on_start(true)?;
1070        media_driver_ctx.set_dir(
1071            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1072        )?;
1073        media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?;
1074        media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?;
1075        media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?;
1076        media_driver_ctx.set_driver_timeout_ms(60_000)?;
1077        let (_stop, driver_handle) =
1078            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1079
1080        let ctx = AeronContext::new()?;
1081        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1082        let mut error_handler = Handler::leak(TestErrorCount::default());
1083        ctx.set_error_handler(Some(&error_handler))?;
1084
1085        let aeron = Aeron::new(&ctx)?;
1086        aeron.start()?;
1087        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1088
1089        // Create two subscriptions on the same channel.
1090        let subscription1 = aeron.add_subscription(
1091            AERON_IPC_STREAM,
1092            123,
1093            Handlers::no_available_image_handler(),
1094            Handlers::no_unavailable_image_handler(),
1095            Duration::from_secs(5),
1096        )?;
1097        let subscription2 = aeron.add_subscription(
1098            AERON_IPC_STREAM,
1099            123,
1100            Handlers::no_available_image_handler(),
1101            Handlers::no_unavailable_image_handler(),
1102            Duration::from_secs(5),
1103        )?;
1104
1105        let count1 = Arc::new(AtomicUsize::new(0));
1106        let count2 = Arc::new(AtomicUsize::new(0));
1107
1108        // Publish a single message.
1109        let msg = b"hello multi-subscription";
1110        let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1111        assert!(
1112            result >= msg.len() as i64,
1113            "Message should be sent successfully"
1114        );
1115
1116        let start_time = Instant::now();
1117        // Poll both subscriptions with inline closures until each has received at least one message.
1118        while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
1119            && start_time.elapsed() < Duration::from_secs(5)
1120        {
1121            let _ = subscription1.poll_once(
1122                |_msg, _header| {
1123                    count1.fetch_add(1, Ordering::SeqCst);
1124                },
1125                128,
1126            )?;
1127            let _ = subscription2.poll_once(
1128                |_msg, _header| {
1129                    count2.fetch_add(1, Ordering::SeqCst);
1130                },
1131                128,
1132            )?;
1133        }
1134
1135        assert!(
1136            count1.load(Ordering::SeqCst) >= 1,
1137            "Subscription 1 did not receive the message"
1138        );
1139        assert!(
1140            count2.load(Ordering::SeqCst) >= 1,
1141            "Subscription 2 did not receive the message"
1142        );
1143
1144        drop(subscription2);
1145        drop(subscription1);
1146        drop(publisher);
1147        drop(aeron);
1148        _stop.store(true, Ordering::SeqCst);
1149        let _ = driver_handle.join().unwrap();
1150        error_handler.release();
1151        Ok(())
1152    }
1153
1154    #[test]
1155    #[serial]
1156    pub fn should_be_able_to_drop_after_close_manually_being_closed(
1157    ) -> Result<(), Box<dyn error::Error>> {
1158        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1159
1160        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1161        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1162        media_driver_ctx.set_dir_delete_on_start(true)?;
1163        media_driver_ctx.set_dir(
1164            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1165        )?;
1166        let (_stop, driver_handle) =
1167            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1168
1169        let ctx = AeronContext::new()?;
1170        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1171        let mut error_handler = Handler::leak(AeronErrorHandlerLogger);
1172        ctx.set_error_handler(Some(&error_handler))?;
1173
1174        let aeron = Aeron::new(&ctx)?;
1175        aeron.start()?;
1176
1177        {
1178            let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1179            info!("created publication [sessionId={}]", publisher.session_id());
1180            publisher.close_with_no_args()?;
1181            drop(publisher);
1182        }
1183
1184        {
1185            let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
1186            info!("created publication [sessionId={}]", publisher.session_id());
1187            publisher.close(Handlers::no_notification_handler())?;
1188            drop(publisher);
1189        }
1190
1191        {
1192            let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
1193            publisher.close_once(|| println!("on close"))?;
1194            info!("created publication [sessionId={}]", publisher.session_id());
1195            drop(publisher);
1196        }
1197
1198        drop(aeron);
1199        _stop.store(true, Ordering::SeqCst);
1200        let _ = driver_handle.join().unwrap();
1201        error_handler.release();
1202        Ok(())
1203    }
1204
1205    #[test]
1206    #[serial]
1207    pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
1208        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1209
1210        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1211        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1212        media_driver_ctx.set_dir_delete_on_start(true)?;
1213        media_driver_ctx.set_dir(
1214            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1215        )?;
1216        let (_stop, driver_handle) =
1217            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1218
1219        let ctx = AeronContext::new()?;
1220        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1221        let mut error_handler = Handler::leak(TestErrorCount::default());
1222        ctx.set_error_handler(Some(&error_handler))?;
1223
1224        let aeron = Aeron::new(&ctx)?;
1225        aeron.start()?;
1226        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1227
1228        // Close the publication immediately.
1229        publisher.close(Handlers::no_notification_handler())?;
1230
1231        // Attempt to send a message after the publication is closed.
1232        let result = publisher.offer(
1233            b"should fail",
1234            Handlers::no_reserved_value_supplier_handler(),
1235        );
1236        assert!(
1237            result < 0,
1238            "Offering on a closed publication should return a negative error code"
1239        );
1240
1241        drop(publisher);
1242        drop(aeron);
1243        _stop.store(true, Ordering::SeqCst);
1244        let _ = driver_handle.join().unwrap();
1245        error_handler.release();
1246        Ok(())
1247    }
1248
1249    /// Test sending and receiving an empty (zero-length) message using inline closures with poll_once.
1250    #[test]
1251    #[serial]
1252    pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
1253        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1254
1255        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1256        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1257        media_driver_ctx.set_dir_delete_on_start(true)?;
1258        media_driver_ctx.set_dir(
1259            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1260        )?;
1261        let (_stop, driver_handle) =
1262            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1263
1264        let ctx = AeronContext::new()?;
1265        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1266        let mut error_handler = Handler::leak(TestErrorCount::default());
1267        ctx.set_error_handler(Some(&error_handler))?;
1268
1269        let aeron = Aeron::new(&ctx)?;
1270        aeron.start()?;
1271        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1272        let subscription = aeron.add_subscription(
1273            AERON_IPC_STREAM,
1274            123,
1275            Handlers::no_available_image_handler(),
1276            Handlers::no_unavailable_image_handler(),
1277            Duration::from_secs(5),
1278        )?;
1279
1280        let empty_received = Arc::new(AtomicBool::new(false));
1281        let start_time = Instant::now();
1282
1283        let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
1284        assert!(result > 0);
1285
1286        while !empty_received.load(Ordering::SeqCst)
1287            && start_time.elapsed() < Duration::from_secs(5)
1288        {
1289            let _ = subscription.poll_once(
1290                |msg, _header| {
1291                    if msg.is_empty() {
1292                        empty_received.store(true, Ordering::SeqCst);
1293                    }
1294                },
1295                128,
1296            )?;
1297        }
1298
1299        assert!(
1300            empty_received.load(Ordering::SeqCst),
1301            "Empty message was not received"
1302        );
1303        drop(subscription);
1304        drop(publisher);
1305        drop(aeron);
1306        _stop.store(true, Ordering::SeqCst);
1307        let _ = driver_handle.join().unwrap();
1308        error_handler.release();
1309        Ok(())
1310    }
1311
1312    #[derive(Default, Debug)]
1313    struct MdcTotals {
1314        gap_events: u64,
1315        missing_messages: u64,
1316        received_messages: u64,
1317    }
1318
1319    #[derive(Debug)]
1320    struct MdcWindowStats {
1321        expected_seq: Option<u64>,
1322        gap_events: u64,
1323        missing_messages: u64,
1324        received_messages: u64,
1325        histogram: Histogram<u64>,
1326    }
1327
1328    impl MdcWindowStats {
1329        fn new() -> Result<Self, Box<dyn error::Error>> {
1330            Ok(Self {
1331                expected_seq: None,
1332                gap_events: 0,
1333                missing_messages: 0,
1334                received_messages: 0,
1335                histogram: Histogram::new(3)?,
1336            })
1337        }
1338
1339        fn observe(&mut self, seq: u64, sent_ts_ns: u64) {
1340            self.received_messages += 1;
1341
1342            match self.expected_seq {
1343                None => self.expected_seq = Some(seq.saturating_add(1)),
1344                Some(expected) if seq > expected => {
1345                    self.gap_events += 1;
1346                    self.missing_messages += seq - expected;
1347                    self.expected_seq = Some(seq.saturating_add(1));
1348                }
1349                Some(expected) if seq == expected => {
1350                    self.expected_seq = Some(expected.saturating_add(1));
1351                }
1352                Some(_) => {
1353                    // Ignore out-of-order/late packets for gap counting in this window.
1354                }
1355            }
1356
1357            let now_ns = Aeron::nano_clock().max(0) as u64;
1358            let latency_ns = now_ns.saturating_sub(sent_ts_ns);
1359            let _ = self.histogram.record(latency_ns);
1360        }
1361
1362        fn print_and_reset(
1363            &mut self,
1364            window_number: usize,
1365            interval: Duration,
1366            totals: &mut MdcTotals,
1367        ) {
1368            totals.gap_events += self.gap_events;
1369            totals.missing_messages += self.missing_messages;
1370            totals.received_messages += self.received_messages;
1371
1372            if self.histogram.len() > 0 {
1373                let min_us = self.histogram.min() / 1_000;
1374                let p50_us = self.histogram.value_at_quantile(0.50) / 1_000;
1375                let p99_us = self.histogram.value_at_quantile(0.99) / 1_000;
1376                let max_us = self.histogram.max() / 1_000;
1377                println!(
1378                    "[mdc-window-{window_number}] interval={interval:?} received={} gaps={} missing={} latency_us[min={}, p50={}, p99={}, max={}]",
1379                    self.received_messages, self.gap_events, self.missing_messages, min_us, p50_us, p99_us, max_us,
1380                );
1381            } else {
1382                println!(
1383                    "[mdc-window-{window_number}] interval={interval:?} received=0 gaps=0 missing=0 latency_us[min=n/a, p50=n/a, p99=n/a, max=n/a]"
1384                );
1385            }
1386
1387            self.expected_seq = None;
1388            self.gap_events = 0;
1389            self.missing_messages = 0;
1390            self.received_messages = 0;
1391            self.histogram.reset();
1392        }
1393    }
1394
1395    /// Run with loss profile on macOS:
1396    /// `just mdc-loss-run 120 10 0.10`
1397    ///
1398    /// The recipe handles PF setup and cleanup automatically.
1399    ///
1400    /// Manual cleanup (if needed):
1401    /// `sudo pfctl -a com.apple/rusteron-mdc-loss -F all`
1402    /// `sudo dnctl -q flush`
1403    /// `sudo pfctl -f /etc/pf.conf`
1404    #[test]
1405    #[serial]
1406    #[ignore] // Long-running diagnostics test for manual MDC with rolling latency/gap reports.
1407    pub fn mdc_unreliable_gap_latency_histogram_report() -> Result<(), Box<dyn error::Error>> {
1408        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1409
1410        const STREAM_ID: i32 = 32931;
1411        const CONTROL_PORT: u16 = 32929;
1412        const SUBSCRIBER_PORT: u16 = 32930;
1413        const MESSAGE_LEN: usize = 130;
1414
1415        let report_interval = Duration::from_secs(
1416            std::env::var("RUSTERON_MDC_REPORT_INTERVAL_SECS")
1417                .ok()
1418                .and_then(|value| value.parse::<u64>().ok())
1419                .unwrap_or(10),
1420        );
1421        let test_duration = Duration::from_secs(
1422            std::env::var("RUSTERON_MDC_TEST_DURATION_SECS")
1423                .ok()
1424                .and_then(|value| value.parse::<u64>().ok())
1425                .unwrap_or(300),
1426        );
1427        let mdc_host = std::env::var("RUSTERON_MDC_HOST").unwrap_or("127.0.0.1".to_string());
1428
1429        let publication_channel = format!(
1430            "aeron:udp?control-mode=manual|control={}:{CONTROL_PORT}",
1431            mdc_host
1432        );
1433        // - group=false: do not apply MDC receiver-group semantics.
1434        // - nak-delay=100us: shorten unreliable-stream gap-fill decision latency.
1435        let subscription_channel = format!(
1436            "aeron:udp?endpoint={}:{SUBSCRIBER_PORT}|reliable=false|tether=false|group=false|nak-delay=500us",
1437            mdc_host
1438        );
1439        let destination_uri = format!("aeron:udp?endpoint={}:{SUBSCRIBER_PORT}", mdc_host);
1440
1441        let (media_driver_ctx, stop_driver, driver_handle) = start_media_driver(32930)?;
1442        let aeron_dir = media_driver_ctx.get_dir().to_string();
1443
1444        println!(
1445            "[mdc-start] publication={} subscription={} duration={:?} report_interval={:?}",
1446            publication_channel, subscription_channel, test_duration, report_interval
1447        );
1448
1449        let running = Arc::new(AtomicBool::new(true));
1450
1451        let subscriber_dir = aeron_dir.clone();
1452        let subscriber_channel = subscription_channel.clone();
1453        let subscriber_running = Arc::clone(&running);
1454        let subscriber_thread = std::thread::spawn(move || -> MdcTotals {
1455            let (_ctx, aeron) = create_client_for_dir(&subscriber_dir)
1456                .expect("failed to create subscriber aeron client");
1457
1458            let subscription = aeron
1459                .add_subscription(
1460                    &subscriber_channel.into_c_string(),
1461                    STREAM_ID,
1462                    Handlers::no_available_image_handler(),
1463                    Handlers::no_unavailable_image_handler(),
1464                    Duration::from_secs(5),
1465                )
1466                .expect("failed to create subscriber");
1467
1468            let mut totals = MdcTotals::default();
1469            let mut window_stats = MdcWindowStats::new().expect("failed to create histogram");
1470            let test_start = Instant::now();
1471            let mut window_start = test_start;
1472            let mut window_number = 1usize;
1473
1474            while test_start.elapsed() < test_duration {
1475                let _ = subscription
1476                    .poll_once(
1477                        |msg, _header| {
1478                            if msg.len() < 16 {
1479                                return;
1480                            }
1481
1482                            let seq = u64::from_le_bytes(msg[0..8].try_into().unwrap());
1483                            let sent_ts_ns = u64::from_le_bytes(msg[8..16].try_into().unwrap());
1484                            window_stats.observe(seq, sent_ts_ns);
1485                        },
1486                        10_000,
1487                    )
1488                    .expect("subscriber poll failed");
1489
1490                if window_start.elapsed() >= report_interval {
1491                    window_stats.print_and_reset(window_number, report_interval, &mut totals);
1492                    window_number += 1;
1493                    window_start = Instant::now();
1494                }
1495            }
1496
1497            window_stats.print_and_reset(window_number, report_interval, &mut totals);
1498            subscriber_running.store(false, Ordering::SeqCst);
1499            totals
1500        });
1501
1502        // Ensure subscriber has started before publisher setup.
1503        sleep(Duration::from_millis(250));
1504
1505        let publisher_dir = aeron_dir;
1506        let publisher_channel = publication_channel.clone();
1507        let publisher_destination = destination_uri.clone();
1508        let publisher_running = Arc::clone(&running);
1509        let publisher_thread = std::thread::spawn(move || -> u64 {
1510            let (_ctx, aeron) = create_client_for_dir(&publisher_dir)
1511                .expect("failed to create publisher aeron client");
1512
1513            let publication = aeron
1514                .add_exclusive_publication(
1515                    &publisher_channel.into_c_string(),
1516                    STREAM_ID,
1517                    Duration::from_secs(5),
1518                )
1519                .expect("failed to create publication");
1520
1521            let add_destination =
1522                AeronAsyncDestination::aeron_exclusive_publication_async_add_destination(
1523                    &aeron,
1524                    &publication,
1525                    &publisher_destination.into_c_string(),
1526                )
1527                .expect("failed to add manual MDC destination");
1528
1529            let add_destination_start = Instant::now();
1530            while add_destination
1531                .aeron_exclusive_publication_async_destination_poll()
1532                .expect("destination add poll failed")
1533                == 0
1534            {
1535                assert!(
1536                    add_destination_start.elapsed() <= Duration::from_secs(5),
1537                    "Timed out adding manual MDC destination"
1538                );
1539                sleep(Duration::from_millis(10));
1540            }
1541
1542            let connect_start = Instant::now();
1543            while !publication.is_connected() && connect_start.elapsed() < Duration::from_secs(5) {
1544                sleep(Duration::from_millis(10));
1545            }
1546            assert!(
1547                publication.is_connected(),
1548                "manual MDC publication did not connect to subscriber destination"
1549            );
1550
1551            let mut seq: u64 = 0;
1552            let mut payload = [0u8; MESSAGE_LEN];
1553            while publisher_running.load(Ordering::Acquire) {
1554                payload[0..8].copy_from_slice(&seq.to_le_bytes());
1555                let ts_ns = Aeron::nano_clock().max(0) as u64;
1556                payload[8..16].copy_from_slice(&ts_ns.to_le_bytes());
1557
1558                let result =
1559                    publication.offer(&payload, Handlers::no_reserved_value_supplier_handler());
1560                if result > 0 {
1561                    seq = seq.wrapping_add(1);
1562                }
1563                sleep(Duration::from_millis(1));
1564            }
1565            seq
1566        });
1567
1568        let totals = subscriber_thread.join().unwrap();
1569        running.store(false, Ordering::SeqCst);
1570        let sent_messages = publisher_thread.join().unwrap();
1571        stop_driver.store(true, Ordering::SeqCst);
1572        let _ = driver_handle.join().unwrap();
1573
1574        println!(
1575            "[mdc-summary] sent={} received={} total_gaps={} total_missing={}",
1576            sent_messages, totals.received_messages, totals.gap_events, totals.missing_messages
1577        );
1578
1579        assert!(sent_messages > 0, "publisher failed to send any messages");
1580        assert!(
1581            totals.received_messages > 0,
1582            "subscriber did not receive any messages"
1583        );
1584        Ok(())
1585    }
1586
1587    #[test]
1588    #[serial]
1589    #[ignore] // need to work to get tags working properly, its more of testing issue then tag issue
1590    pub fn tags() -> Result<(), Box<dyn error::Error>> {
1591        rusteron_code_gen::test_logger::init(log::LevelFilter::Debug);
1592
1593        let (md_ctx, stop, md) = start_media_driver(1)?;
1594
1595        let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
1596
1597        info!("creating suscriber 1");
1598        let sub = aeron_sub
1599            .add_subscription(
1600                &"aeron:udp?tags=100".into_c_string(),
1601                123,
1602                Handlers::no_available_image_handler(),
1603                Handlers::no_unavailable_image_handler(),
1604                Duration::from_secs(50),
1605            )
1606            .map_err(|e| {
1607                error!("aeron error={}", Aeron::errmsg());
1608                e
1609            })?;
1610
1611        let ctx = AeronContext::new()?;
1612        ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
1613        let aeron = Aeron::new(&ctx)?;
1614        aeron.start()?;
1615
1616        info!("creating suscriber 2");
1617        let sub2 = aeron_sub.add_subscription(
1618            &"aeron:udp?tags=100".into_c_string(),
1619            123,
1620            Handlers::no_available_image_handler(),
1621            Handlers::no_unavailable_image_handler(),
1622            Duration::from_secs(50),
1623        )?;
1624
1625        let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
1626        info!("creating publisher");
1627        assert!(!aeron_pub.is_closed());
1628        let publisher = aeron_pub
1629            .add_publication(
1630                &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
1631                123,
1632                Duration::from_secs(5),
1633            )
1634            .map_err(|e| {
1635                error!("aeron error={}", Aeron::errmsg());
1636                e
1637            })?;
1638
1639        info!("publishing msg");
1640
1641        loop {
1642            let result = publisher.offer(
1643                "213".as_bytes(),
1644                Handlers::no_reserved_value_supplier_handler(),
1645            );
1646            if result < 0 {
1647                error!(
1648                    "failed to publish {:?}",
1649                    AeronCError::from_code(result as i32)
1650                );
1651            } else {
1652                break;
1653            }
1654        }
1655
1656        sub.poll_once(
1657            |msg, _header| {
1658                println!("Received message: {:?}", msg);
1659            },
1660            128,
1661        )?;
1662        sub2.poll_once(
1663            |msg, _header| {
1664                println!("Received message: {:?}", msg);
1665            },
1666            128,
1667        )?;
1668
1669        stop.store(true, Ordering::SeqCst);
1670
1671        Ok(())
1672    }
1673
1674    fn create_client_for_dir(dir: &str) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1675        info!("creating aeron client [dir={}]", dir);
1676        let ctx = AeronContext::new()?;
1677        ctx.set_dir(&dir.into_c_string())?;
1678        let aeron = Aeron::new(&ctx)?;
1679        aeron.start()?;
1680        Ok((ctx, aeron))
1681    }
1682
1683    fn create_client(
1684        media_driver_ctx: &AeronDriverContext,
1685    ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1686        let dir = media_driver_ctx.get_dir().to_string();
1687        create_client_for_dir(&dir)
1688    }
1689
1690    fn start_media_driver(
1691        instance: u64,
1692    ) -> Result<
1693        (
1694            AeronDriverContext,
1695            Arc<AtomicBool>,
1696            JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
1697        ),
1698        Box<dyn Error>,
1699    > {
1700        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1701        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1702        media_driver_ctx.set_dir_delete_on_start(true)?;
1703        media_driver_ctx.set_dir(
1704            &format!(
1705                "{}{}-{}",
1706                media_driver_ctx.get_dir(),
1707                Aeron::epoch_clock(),
1708                instance
1709            )
1710            .into_c_string(),
1711        )?;
1712        let (stop, driver_handle) =
1713            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1714        Ok((media_driver_ctx, stop, driver_handle))
1715    }
1716
1717    #[doc = include_str!("../../README.md")]
1718    mod readme_tests {}
1719}