Skip to main content

rusteron_client/
lib.rs

1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25
26include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
27include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
28
29#[cfg(test)]
30mod tests {
31    use super::*;
32    use crate::test_alloc::current_allocs;
33    use hdrhistogram::Histogram;
34    use log::{error, info};
35    use rusteron_media_driver::AeronDriverContext;
36    use serial_test::serial;
37    use std::error;
38    use std::error::Error;
39    use std::io::Write;
40    use std::os::raw::c_int;
41    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
42    use std::sync::Arc;
43    use std::thread::{sleep, JoinHandle};
44    use std::time::{Duration, Instant};
45
46    #[derive(Default, Debug)]
47    struct ErrorCount {
48        error_count: usize,
49    }
50
51    impl AeronErrorHandlerCallback for ErrorCount {
52        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
53            error!("Aeron error {}: {}", error_code, msg);
54            self.error_count += 1;
55        }
56    }
57
58    fn running_under_valgrind() -> bool {
59        std::env::var_os("RUSTERON_VALGRIND").is_some()
60    }
61
62    #[test]
63    #[serial]
64    fn version_check() -> Result<(), Box<dyn error::Error>> {
65        unsafe {
66            aeron_randomised_int32();
67        }
68        let alloc_count = current_allocs();
69
70        {
71            let major = unsafe { crate::aeron_version_major() };
72            let minor = unsafe { crate::aeron_version_minor() };
73            let patch = unsafe { crate::aeron_version_patch() };
74
75            let cargo_version = "1.51.0";
76            let aeron_version = format!("{}.{}.{}", major, minor, patch);
77            assert_eq!(aeron_version, cargo_version);
78
79            let ctx = AeronContext::new()?;
80            let error_count = 1;
81            let mut handler = Handler::leak(ErrorCount::default());
82            ctx.set_error_handler(Some(&handler))?;
83
84            assert!(Aeron::epoch_clock() > 0);
85            drop(ctx);
86            assert!(handler.should_drop);
87            handler.release();
88            assert!(!handler.should_drop);
89            drop(handler);
90        }
91
92        assert!(
93            current_allocs() <= alloc_count,
94            "allocations {} > {alloc_count}",
95            current_allocs()
96        );
97
98        Ok(())
99    }
100
101    #[test]
102    #[serial]
103    fn async_publication_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
104        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
105
106        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
107        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
108        media_driver_ctx.set_dir_delete_on_start(true)?;
109        media_driver_ctx.set_dir(
110            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
111        )?;
112        let (stop, driver_handle) =
113            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
114
115        let ctx = AeronContext::new()?;
116        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
117        let mut error_handler = Handler::leak(ErrorCount::default());
118        ctx.set_error_handler(Some(&error_handler))?;
119        let aeron = Aeron::new(&ctx)?;
120        aeron.start()?;
121
122        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54321");
123
124        // Create async publication and subscription pollers on the same invalid channel and
125        // attempt to resolve them. If both are created, try a small send/receive cycle and then exit.
126        let pub_poller = aeron.async_add_publication(&channel.clone().into_c_string(), 4321)?;
127        let sub_poller = aeron.async_add_subscription(
128            &channel.into_c_string(),
129            4321,
130            Handlers::no_available_image_handler(),
131            Handlers::no_unavailable_image_handler(),
132        )?;
133
134        let mut publication: Option<AeronPublication> = None;
135        let mut subscription: Option<AeronSubscription> = None;
136        let start = Instant::now();
137        while start.elapsed() < Duration::from_secs(2) {
138            if publication.is_none() {
139                match pub_poller.poll() {
140                    Ok(Some(p)) => publication = Some(p),
141                    Ok(None) | Err(_) => {}
142                }
143            }
144            if subscription.is_none() {
145                match sub_poller.poll() {
146                    Ok(Some(s)) => subscription = Some(s),
147                    Ok(None) | Err(_) => {}
148                }
149            }
150            if publication.is_some() && subscription.is_some() {
151                break;
152            }
153            #[cfg(debug_assertions)]
154            std::thread::sleep(Duration::from_millis(10));
155        }
156
157        info!("publication: {:?}", publication);
158        info!("subscription: {:?}", subscription);
159
160        if let (Some(publisher), Some(subscription)) = (publication, subscription) {
161            let payload = b"hello-aeron";
162            let send_start = Instant::now();
163            let mut sent = false;
164            while send_start.elapsed() < Duration::from_millis(500) {
165                let res = publisher.offer(payload, Handlers::no_reserved_value_supplier_handler());
166                if res >= payload.len() as i64 {
167                    sent = true;
168                    info!("sent {:?}", payload);
169                    break;
170                }
171                std::thread::sleep(Duration::from_millis(10));
172            }
173
174            if sent {
175                let mut got = false;
176                let read_start = Instant::now();
177                while read_start.elapsed() < Duration::from_millis(500) {
178                    let _ = subscription.poll_once(
179                        |msg, _hdr| {
180                            if msg == payload {
181                                got = true;
182                                info!("received {:?}", payload);
183                            }
184                        },
185                        1024,
186                    );
187                    if got {
188                        break;
189                    }
190                    std::thread::sleep(Duration::from_millis(10));
191                }
192                // We don't assert on got, just exercise the path.
193            }
194        }
195
196        // Shutdown
197        drop(sub_poller);
198        drop(pub_poller);
199        drop(aeron);
200        stop.store(true, Ordering::SeqCst);
201        let _ = driver_handle.join().unwrap();
202        error_handler.release();
203        Ok(())
204    }
205
206    #[test]
207    #[serial]
208    fn async_pub_sub_invalid_endpoint_create_drop_stress() -> Result<(), Box<dyn error::Error>> {
209        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
210
211        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
212        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
213        media_driver_ctx.set_dir_delete_on_start(true)?;
214        media_driver_ctx.set_dir(
215            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
216        )?;
217        let (stop, driver_handle) =
218            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
219
220        let ctx = AeronContext::new()?;
221        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
222        let mut error_handler = Handler::leak(ErrorCount::default());
223        ctx.set_error_handler(Some(&error_handler))?;
224        let aeron = Aeron::new(&ctx)?;
225        aeron.start()?;
226
227        const STRESS_ITERS: u16 = 60;
228        const POLL_TIMEOUT: Duration = Duration::from_secs(10);
229        const POLL_SLEEP: Duration = Duration::from_millis(10);
230
231        // Stress: repeatedly create async pub/sub on an invalid endpoint and drive each
232        // poller to a terminal state before dropping it.
233        for i in 0..STRESS_ITERS {
234            let port = 55000u16 + i;
235            let channel = format!("aeron:udp?endpoint=203.0.113.1:{}", port);
236            let pub_poller =
237                aeron.async_add_publication(&channel.clone().into_c_string(), 4500 + i as i32)?;
238            let sub_poller = aeron.async_add_subscription(
239                &channel.into_c_string(),
240                4500 + i as i32,
241                Handlers::no_available_image_handler(),
242                Handlers::no_unavailable_image_handler(),
243            )?;
244
245            let start = Instant::now();
246            let mut publication = None;
247            let mut publication_done = false;
248            let mut subscription = None;
249            let mut subscription_done = false;
250
251            while !(publication_done && subscription_done) && start.elapsed() < POLL_TIMEOUT {
252                if !publication_done {
253                    match pub_poller.poll() {
254                        Ok(Some(pub_)) => {
255                            publication = Some(pub_);
256                            publication_done = true;
257                        }
258                        Ok(None) => {}
259                        Err(err) => {
260                            info!("publication async add finished with error on iteration {i}: {err:?}");
261                            publication_done = true;
262                        }
263                    }
264                }
265
266                if !subscription_done {
267                    match sub_poller.poll() {
268                        Ok(Some(sub_)) => {
269                            subscription = Some(sub_);
270                            subscription_done = true;
271                        }
272                        Ok(None) => {}
273                        Err(err) => {
274                            info!("subscription async add finished with error on iteration {i}: {err:?}");
275                            subscription_done = true;
276                        }
277                    }
278                }
279
280                if !(publication_done && subscription_done) {
281                    std::thread::sleep(POLL_SLEEP);
282                }
283            }
284
285            assert!(
286                publication_done,
287                "publication async add did not complete on iteration {i} within {:?}",
288                POLL_TIMEOUT
289            );
290            assert!(
291                subscription_done,
292                "subscription async add did not complete on iteration {i} within {:?}",
293                POLL_TIMEOUT
294            );
295
296            drop(subscription);
297            drop(publication);
298            drop(sub_poller);
299            drop(pub_poller);
300        }
301
302        drop(aeron);
303        stop.store(true, Ordering::SeqCst);
304        let _ = driver_handle.join().unwrap();
305        error_handler.release();
306        Ok(())
307    }
308
309    #[test]
310    #[serial]
311    // // #[ignore] // TODO FIXME broken test
312    fn async_subscription_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
313        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
314
315        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
316        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
317        media_driver_ctx.set_dir_delete_on_start(true)?;
318        media_driver_ctx.set_dir(
319            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
320        )?;
321        let (stop, driver_handle) =
322            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
323
324        let ctx = AeronContext::new()?;
325        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
326        let mut error_handler = Handler::leak(ErrorCount::default());
327        ctx.set_error_handler(Some(&error_handler))?;
328        let aeron = Aeron::new(&ctx)?;
329        aeron.start()?;
330
331        // Invalid remote endpoint only (no interface)
332        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54323");
333
334        let poller = aeron.async_add_subscription(
335            &channel.into_c_string(),
336            4323,
337            Handlers::no_available_image_handler(),
338            Handlers::no_unavailable_image_handler(),
339        )?;
340
341        let start = Instant::now();
342        while start.elapsed() < Duration::from_millis(250) {
343            let _ = poller.poll();
344            #[cfg(debug_assertions)]
345            std::thread::sleep(Duration::from_millis(10));
346        }
347
348        drop(poller);
349        drop(aeron);
350        stop.store(true, Ordering::SeqCst);
351        let _ = driver_handle.join().unwrap();
352        error_handler.release();
353        Ok(())
354    }
355
356    #[test]
357    #[serial]
358    fn blocking_add_subscription_invalid_interface_timeout() -> Result<(), Box<dyn error::Error>> {
359        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
360
361        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
362        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
363        media_driver_ctx.set_dir_delete_on_start(true)?;
364        media_driver_ctx.set_dir(
365            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
366        )?;
367        let (stop, driver_handle) =
368            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
369
370        let ctx = AeronContext::new()?;
371        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
372        let mut error_handler = Handler::leak(ErrorCount::default());
373        ctx.set_error_handler(Some(&error_handler))?;
374        let aeron = Aeron::new(&ctx)?;
375        aeron.start()?;
376
377        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54324");
378
379        let result = aeron.add_subscription(
380            &channel.into_c_string(),
381            4324,
382            Handlers::no_available_image_handler(),
383            Handlers::no_unavailable_image_handler(),
384            Duration::from_millis(300),
385        );
386
387        assert!(result.is_err(), "expected error for invalid interface");
388        drop(aeron);
389        stop.store(true, Ordering::SeqCst);
390        let _ = driver_handle.join().unwrap();
391        error_handler.release();
392        Ok(())
393    }
394
395    #[test]
396    #[serial]
397    fn async_publication_invalid_bind_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
398        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
399
400        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
401        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
402        media_driver_ctx.set_dir_delete_on_start(true)?;
403        media_driver_ctx.set_dir(
404            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
405        )?;
406        let (stop, driver_handle) =
407            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
408
409        let ctx = AeronContext::new()?;
410        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
411        let mut error_handler = Handler::leak(ErrorCount::default());
412        ctx.set_error_handler(Some(&error_handler))?;
413        let aeron = Aeron::new(&ctx)?;
414        aeron.start()?;
415
416        // Use an invalid bind on publication (bind is not valid for publication, and the IP is unowned).
417        let channel = format!("aeron:udp?endpoint=127.0.0.1:54330|bind=203.0.113.1:60000");
418
419        let poller = aeron.async_add_publication(&channel.into_c_string(), 4330)?;
420        let start = Instant::now();
421        while start.elapsed() < Duration::from_millis(250) {
422            let _ = poller.poll();
423            #[cfg(debug_assertions)]
424            std::thread::sleep(Duration::from_millis(10));
425        }
426        drop(poller);
427        drop(aeron);
428        stop.store(true, Ordering::SeqCst);
429        let _ = driver_handle.join().unwrap();
430        error_handler.release();
431        Ok(())
432    }
433
434    #[test]
435    #[serial]
436    pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
437        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
438        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
439        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
440        media_driver_ctx.set_dir_delete_on_start(true)?;
441        media_driver_ctx.set_dir(
442            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
443        )?;
444        // Under Valgrind execution is ~10x slower; increase liveness timeouts so the
445        // driver doesn't evict the client before the test finishes.
446        media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; // 60 s
447        media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; // 60 s
448        media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; // 65 s
449        media_driver_ctx.set_driver_timeout_ms(60_000)?; // 60 s
450        let (stop, driver_handle) =
451            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
452
453        let ctx = AeronContext::new()?;
454        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
455        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
456        // Keep client-side keepalive threshold aligned with the slow Valgrind environment.
457        ctx.set_driver_timeout_ms(60_000)?;
458        // Store all handlers so we can call release() after Aeron is stopped.
459        // Anonymous Handler::leak() temporaries would be dropped without release(),
460        // triggering the Drop impl warning and leaving heap allocations permanently orphaned.
461        let mut error_handler = Handler::leak(ErrorCount::default());
462        let mut new_pub_handler = Handler::leak(AeronNewPublicationLogger);
463        let mut avail_counter_handler1 = Handler::leak(AeronAvailableCounterLogger);
464        let mut close_client_handler = Handler::leak(AeronCloseClientLogger);
465        let mut new_sub_handler = Handler::leak(AeronNewSubscriptionLogger);
466        let mut unavail_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
467        let mut avail_counter_handler2 = Handler::leak(AeronAvailableCounterLogger);
468        let mut excl_pub_handler = Handler::leak(AeronNewPublicationLogger);
469        ctx.set_error_handler(Some(&error_handler))?;
470        ctx.set_on_new_publication(Some(&new_pub_handler))?;
471        ctx.set_on_available_counter(Some(&avail_counter_handler1))?;
472        ctx.set_on_close_client(Some(&close_client_handler))?;
473        ctx.set_on_new_subscription(Some(&new_sub_handler))?;
474        ctx.set_on_unavailable_counter(Some(&unavail_counter_handler))?;
475        ctx.set_on_available_counter(Some(&avail_counter_handler2))?;
476        ctx.set_on_new_exclusive_publication(Some(&excl_pub_handler))?;
477
478        info!("creating client [simple_large_send test]");
479        let aeron = Aeron::new(&ctx)?;
480        info!("starting client");
481
482        aeron.start()?;
483        info!("client started");
484        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
485        info!("created publisher");
486
487        assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
488        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
489        AeronCncMetadata::read_from_file(&cstr, |cnc| {
490            assert!(cnc.pid > 0);
491        })?;
492        assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
493        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
494        for _ in 0..50 {
495            AeronCnc::read_on_partial_stack(&cstr, |cnc| {
496                assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
497            })?;
498        }
499
500        let subscription = aeron.add_subscription(
501            AERON_IPC_STREAM,
502            123,
503            Handlers::no_available_image_handler(),
504            Handlers::no_unavailable_image_handler(),
505            Duration::from_secs(5),
506        )?;
507        info!("created subscription");
508
509        subscription
510            .poll_once(|msg, header| println!("foo"), 1024)
511            .unwrap();
512
513        // pick a large enough size to confirm fragement assembler is working
514        let string_len = media_driver_ctx.ipc_mtu_length * 100;
515        info!("string length: {}", string_len);
516
517        let stop_publisher = Arc::new(AtomicBool::new(false));
518
519        let publisher_handler = {
520            let stop_publisher = stop_publisher.clone();
521            std::thread::spawn(move || {
522                let binding = "1".repeat(string_len);
523                let large_msg = binding.as_bytes();
524                loop {
525                    if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
526                        break;
527                    }
528                    let result =
529                        publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
530
531                    assert_eq!(123, publisher.get_constants().unwrap().stream_id);
532
533                    if result < large_msg.len() as i64 {
534                        let error = AeronCError::from_code(result as i32);
535                        match error.kind() {
536                            AeronErrorType::PublicationBackPressured
537                            | AeronErrorType::PublicationAdminAction => {
538                                // ignore
539                            }
540                            _ => {
541                                error!(
542                                    "ERROR: failed to send message {:?}",
543                                    AeronCError::from_code(result as i32)
544                                );
545                            }
546                        }
547                        sleep(Duration::from_millis(500));
548                    }
549                }
550                info!("stopping publisher thread");
551            })
552        };
553
554        let mut assembler = AeronFragmentClosureAssembler::new()?;
555
556        struct Context {
557            count: Arc<AtomicUsize>,
558            stop: Arc<AtomicBool>,
559            string_len: usize,
560        }
561
562        let count = Arc::new(AtomicUsize::new(0usize));
563        let mut context = Context {
564            count: count.clone(),
565            stop: stop.clone(),
566            string_len,
567        };
568
569        // Start the timer
570        let start_time = Instant::now();
571
572        // Use break-with-value so cleanup (handler release, driver stop) always runs.
573        // 120-second timeout: under Valgrind execution is ~10× slower, so 30 s is too tight.
574        let loop_result: Result<(), Box<dyn error::Error>> = loop {
575            if start_time.elapsed() > Duration::from_secs(120) {
576                info!("Failed: exceeded 120-second timeout");
577                break Err(Box::new(std::io::Error::new(
578                    std::io::ErrorKind::TimedOut,
579                    "Timeout exceeded",
580                )));
581            }
582            let c = count.load(Ordering::SeqCst);
583            if c > 100 {
584                break Ok(());
585            }
586
587            fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
588                ctx.count.fetch_add(1, Ordering::SeqCst);
589
590                let values = header.get_values().unwrap();
591                assert_ne!(values.frame.session_id, 0);
592
593                if buffer.len() != ctx.string_len {
594                    ctx.stop.store(true, Ordering::SeqCst);
595                    error!(
596                        "ERROR: message was {} but was expecting {} [header={:?}]",
597                        buffer.len(),
598                        ctx.string_len,
599                        header
600                    );
601                    sleep(Duration::from_secs(1));
602                }
603
604                assert_eq!(buffer.len(), ctx.string_len);
605                assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
606            }
607
608            subscription.poll(assembler.process(&mut context, process_msg), 128)?;
609            assert_eq!(123, subscription.get_constants().unwrap().stream_id);
610        };
611
612        subscription.close(Handlers::no_notification_handler())?;
613
614        info!("stopping client");
615        stop_publisher.store(true, Ordering::SeqCst);
616
617        let _ = publisher_handler.join().unwrap();
618        drop(subscription);
619        drop(aeron);
620
621        stop.store(true, Ordering::SeqCst);
622        let _ = driver_handle.join().unwrap();
623
624        // Release all context handlers now that Aeron and the driver are fully stopped.
625        error_handler.release();
626        new_pub_handler.release();
627        avail_counter_handler1.release();
628        close_client_handler.release();
629        new_sub_handler.release();
630        unavail_counter_handler.release();
631        avail_counter_handler2.release();
632        excl_pub_handler.release();
633
634        let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
635        cnc.counters_reader().foreach_counter_once(
636            |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
637                println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
638                AeronSystemCounterType::try_from(type_id));
639            },
640        );
641        cnc.error_log_read_once(| observation_count: i32,
642                                     first_observation_timestamp: i64,
643                                     last_observation_timestamp: i64,
644                                     error: &str| {
645            println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
646        }, 0);
647        cnc.loss_reporter_read_once(|    observation_count: i64,
648                                    total_bytes_lost: i64,
649                                    first_observation_timestamp: i64,
650                                    last_observation_timestamp: i64,
651                                    session_id: i32,
652                                    stream_id: i32,
653                                    channel: &str,
654                                    source: &str,| {
655            println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
656        })?;
657
658        loop_result?;
659        Ok(())
660    }
661
662    #[test]
663    #[serial]
664    pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
665        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
666        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
667        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
668        media_driver_ctx.set_dir_delete_on_start(true)?;
669        media_driver_ctx.set_dir(
670            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
671        )?;
672        // Under Valgrind execution is ~10x slower; increase liveness timeouts so the
673        // driver doesn't evict the client before the test finishes.
674        media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; // 60 s
675        media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; // 60 s
676        media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; // 65 s
677        media_driver_ctx.set_driver_timeout_ms(60_000)?; // 60 s
678        let (stop, driver_handle) =
679            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
680
681        let ctx = AeronContext::new()?;
682        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
683        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
684        // Keep client-side keepalive threshold aligned with slow Valgrind environment.
685        ctx.set_driver_timeout_ms(60_000)?;
686        let mut error_handler = Handler::leak(ErrorCount::default());
687        ctx.set_error_handler(Some(&error_handler))?;
688
689        info!("creating client [try_claim test]");
690        let aeron = Aeron::new(&ctx)?;
691        info!("starting client");
692
693        aeron.start()?;
694        info!("client started");
695        const STREAM_ID: i32 = 123;
696        let publisher =
697            aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
698        info!("created publisher");
699
700        let subscription = aeron.add_subscription(
701            AERON_IPC_STREAM,
702            STREAM_ID,
703            Handlers::no_available_image_handler(),
704            Handlers::no_unavailable_image_handler(),
705            Duration::from_secs(5),
706        )?;
707        info!("created subscription");
708
709        // pick a large enough size to confirm fragement assembler is working
710        let string_len = 156;
711        info!("string length: {}", string_len);
712
713        let stop_publisher = Arc::new(AtomicBool::new(false));
714
715        let publisher_handler = {
716            let stop_publisher = stop_publisher.clone();
717            std::thread::spawn(move || {
718                let binding = "1".repeat(string_len);
719                let msg = binding.as_bytes();
720                let buffer = AeronBufferClaim::default();
721                loop {
722                    if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
723                        break;
724                    }
725
726                    let result = publisher.try_claim(string_len, &buffer);
727
728                    if result < msg.len() as i64 {
729                        error!(
730                            "ERROR: failed to send message {:?}",
731                            AeronCError::from_code(result as i32)
732                        );
733                    } else {
734                        buffer.data().write_all(&msg).unwrap();
735                        buffer.commit().unwrap();
736                    }
737                }
738                info!("stopping publisher thread");
739            })
740        };
741
742        let count = Arc::new(AtomicUsize::new(0usize));
743        let count_copy = Arc::clone(&count);
744        let stop2 = stop.clone();
745
746        struct FragmentHandler {
747            count_copy: Arc<AtomicUsize>,
748            stop2: Arc<AtomicBool>,
749            string_len: usize,
750        }
751
752        impl AeronFragmentHandlerCallback for FragmentHandler {
753            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
754                assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
755                let header = header.get_values().unwrap();
756                let frame = header.frame();
757                let stream_id = frame.stream_id();
758                assert_eq!(STREAM_ID, stream_id);
759
760                self.count_copy.fetch_add(1, Ordering::SeqCst);
761
762                if buffer.len() != self.string_len {
763                    self.stop2.store(true, Ordering::SeqCst);
764                    error!(
765                        "ERROR: message was {} but was expecting {} [header={:?}]",
766                        buffer.len(),
767                        self.string_len,
768                        header
769                    );
770                    sleep(Duration::from_secs(1));
771                }
772
773                assert_eq!(buffer.len(), self.string_len);
774                assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
775            }
776        }
777
778        let (mut closure, mut inner_handler) =
779            Handler::leak_with_fragment_assembler(FragmentHandler {
780                count_copy,
781                stop2,
782                string_len,
783            })?;
784        let loop_result: Result<(), Box<dyn error::Error>> = {
785            let start_time = Instant::now();
786            loop {
787                if start_time.elapsed() > Duration::from_secs(120) {
788                    info!("Failed: exceeded 120-second timeout");
789                    break Err(Box::new(std::io::Error::new(
790                        std::io::ErrorKind::TimedOut,
791                        "Timeout exceeded",
792                    )));
793                }
794                let c = count.load(Ordering::SeqCst);
795                if c > 100 {
796                    break Ok(());
797                }
798                subscription.poll(Some(&closure), 128)?;
799            }
800        };
801
802        info!("stopping client");
803
804        stop_publisher.store(true, Ordering::SeqCst);
805
806        let _ = publisher_handler.join().unwrap();
807        drop(subscription);
808        drop(aeron);
809
810        stop.store(true, Ordering::SeqCst);
811        let _ = driver_handle.join().unwrap();
812        closure.release();
813        inner_handler.release();
814        error_handler.release();
815        loop_result?;
816        Ok(())
817    }
818
819    #[test]
820    #[serial]
821    pub fn counters() -> Result<(), Box<dyn error::Error>> {
822        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
823        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
824        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
825        media_driver_ctx.set_dir_delete_on_start(true)?;
826        media_driver_ctx.set_dir(
827            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
828        )?;
829        let (stop, driver_handle) =
830            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
831
832        let ctx = AeronContext::new()?;
833        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
834        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
835        let mut error_handler = Handler::leak(ErrorCount::default());
836        ctx.set_error_handler(Some(&error_handler))?;
837        let mut unavailable_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
838        ctx.set_on_unavailable_counter(Some(&unavailable_counter_handler))?;
839
840        struct AvailableCounterHandler {
841            found_counter: bool,
842        }
843
844        impl AeronAvailableCounterCallback for AvailableCounterHandler {
845            fn handle_aeron_on_available_counter(
846                &mut self,
847                counters_reader: AeronCountersReader,
848                registration_id: i64,
849                counter_id: i32,
850            ) -> () {
851                info!(
852            "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
853            String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
854            counters_reader.get_counter_label(counter_id, 1000),
855            counters_reader.addr(counter_id)
856        );
857
858                assert_eq!(
859                    counters_reader.counter_registration_id(counter_id).unwrap(),
860                    registration_id
861                );
862
863                if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
864                    if label == "label_buffer" {
865                        self.found_counter = true;
866                        assert_eq!(
867                            &counters_reader.get_counter_key(counter_id).unwrap(),
868                            "key".as_bytes()
869                        );
870                    }
871                }
872            }
873        }
874
875        let mut available_counter_handler = Handler::leak(AvailableCounterHandler {
876            found_counter: false,
877        });
878        ctx.set_on_available_counter(Some(&available_counter_handler))?;
879
880        info!("creating client");
881        let aeron = Aeron::new(&ctx)?;
882        info!("starting client");
883
884        aeron.start()?;
885        info!("client started [counters test]");
886
887        let counter = aeron.add_counter(
888            123,
889            "key".as_bytes(),
890            "label_buffer",
891            Duration::from_secs(5),
892        )?;
893        let constants = counter.get_constants()?;
894        let counter_id = constants.counter_id;
895
896        let stop_publisher = Arc::new(AtomicBool::new(false));
897
898        let publisher_handler = {
899            let stop_publisher = stop_publisher.clone();
900            let counter = counter.clone();
901            std::thread::spawn(move || {
902                for _ in 0..150 {
903                    if stop_publisher.load(Ordering::Acquire) || counter.is_closed() {
904                        break;
905                    }
906                    counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
907                }
908                info!("stopping publisher thread");
909            })
910        };
911
912        let now = Instant::now();
913        while counter.addr_atomic().load(Ordering::SeqCst) < 100
914            && now.elapsed() < Duration::from_secs(10)
915        {
916            sleep(Duration::from_micros(10));
917        }
918
919        assert!(now.elapsed() < Duration::from_secs(10));
920
921        info!(
922            "counter is {}",
923            counter.addr_atomic().load(Ordering::SeqCst)
924        );
925
926        info!("stopping client");
927
928        #[cfg(not(target_os = "windows"))] // not sure why windows version doesn't fire event
929        assert!(available_counter_handler.found_counter);
930
931        let reader = aeron.counters_reader();
932        assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
933        assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
934        let buffers = AeronCountersReaderBuffers::default();
935        reader.get_buffers(&buffers)?;
936
937        stop_publisher.store(true, Ordering::SeqCst);
938
939        let _ = publisher_handler.join().unwrap();
940        drop(counter);
941        drop(aeron);
942
943        stop.store(true, Ordering::SeqCst);
944        let _ = driver_handle.join().unwrap();
945        available_counter_handler.release();
946        unavailable_counter_handler.release();
947        error_handler.release();
948        Ok(())
949    }
950
951    /// A simple error counter for testing error callback invocation.
952    #[derive(Default, Debug)]
953    struct TestErrorCount {
954        pub error_count: usize,
955    }
956
957    impl Drop for TestErrorCount {
958        fn drop(&mut self) {
959            info!("TestErrorCount dropped with {} errors", self.error_count);
960        }
961    }
962
963    impl AeronErrorHandlerCallback for TestErrorCount {
964        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
965            error!("Aeron error {}: {}", error_code, msg);
966            self.error_count += 1;
967        }
968    }
969
970    #[test]
971    #[serial]
972    pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
973        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
974
975        let under_valgrind = running_under_valgrind();
976        let driver_timeout_ms = if under_valgrind { 180_000 } else { 60_000 };
977        let liveness_timeout_ns = if under_valgrind {
978            180_000_000_000
979        } else {
980            60_000_000_000
981        };
982        let poll_timeout = Duration::from_millis(driver_timeout_ms as u64);
983
984        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
985        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
986        media_driver_ctx.set_dir_delete_on_start(true)?;
987        media_driver_ctx.set_dir(
988            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
989        )?;
990        media_driver_ctx.set_client_liveness_timeout_ns(liveness_timeout_ns)?;
991        media_driver_ctx.set_image_liveness_timeout_ns(liveness_timeout_ns)?;
992        media_driver_ctx.set_publication_unblock_timeout_ns(liveness_timeout_ns + 5_000_000_000)?;
993        media_driver_ctx.set_driver_timeout_ms(driver_timeout_ms)?;
994        let (stop, driver_handle) =
995            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
996
997        let ctx = AeronContext::new()?;
998        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
999        ctx.set_driver_timeout_ms(driver_timeout_ms)?;
1000        let mut error_handler = Handler::leak(TestErrorCount::default());
1001        ctx.set_error_handler(Some(&error_handler))?;
1002
1003        let aeron = Aeron::new(&ctx)?;
1004        aeron.start()?;
1005
1006        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1007        let subscription = aeron.add_subscription(
1008            AERON_IPC_STREAM,
1009            123,
1010            Handlers::no_available_image_handler(),
1011            Handlers::no_unavailable_image_handler(),
1012            Duration::from_secs(5),
1013        )?;
1014
1015        let count = Arc::new(AtomicUsize::new(0));
1016        let start_time = Instant::now();
1017
1018        let stop_publisher = Arc::new(AtomicBool::new(false));
1019
1020        // Spawn a publisher thread that repeatedly sends "test" messages.
1021        let publisher_thread = {
1022            let stop_publisher = stop_publisher.clone();
1023            std::thread::spawn(move || {
1024                while !stop_publisher.load(Ordering::Acquire) {
1025                    let msg = b"test";
1026                    let result =
1027                        publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1028                    // If backpressure is encountered, sleep a bit.
1029                    if result == AeronErrorType::PublicationBackPressured.code() as i64 {
1030                        sleep(Duration::from_millis(50));
1031                    }
1032                    if publisher.is_closed() {
1033                        break;
1034                    }
1035                }
1036            })
1037        };
1038
1039        // Poll using the inline closure via poll_once until we receive at least 50 messages.
1040        while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < poll_timeout {
1041            let _ = subscription.poll_once(
1042                |_msg, _header| {
1043                    count.fetch_add(1, Ordering::SeqCst);
1044                },
1045                128,
1046            )?;
1047        }
1048
1049        stop_publisher.store(true, Ordering::SeqCst);
1050        publisher_thread.join().unwrap();
1051        drop(subscription);
1052        drop(aeron);
1053        stop.store(true, Ordering::SeqCst);
1054        let _ = driver_handle.join().unwrap();
1055        error_handler.release();
1056
1057        assert!(
1058            count.load(Ordering::SeqCst) >= 50,
1059            "Expected at least 50 messages received"
1060        );
1061        Ok(())
1062    }
1063
1064    #[test]
1065    #[serial]
1066    pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
1067        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1068
1069        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1070        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1071        media_driver_ctx.set_dir_delete_on_start(true)?;
1072        media_driver_ctx.set_dir(
1073            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1074        )?;
1075        media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?;
1076        media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?;
1077        media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?;
1078        media_driver_ctx.set_driver_timeout_ms(60_000)?;
1079        let (_stop, driver_handle) =
1080            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1081
1082        let ctx = AeronContext::new()?;
1083        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1084        let mut error_handler = Handler::leak(TestErrorCount::default());
1085        ctx.set_error_handler(Some(&error_handler))?;
1086
1087        let aeron = Aeron::new(&ctx)?;
1088        aeron.start()?;
1089        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1090
1091        // Create two subscriptions on the same channel.
1092        let subscription1 = aeron.add_subscription(
1093            AERON_IPC_STREAM,
1094            123,
1095            Handlers::no_available_image_handler(),
1096            Handlers::no_unavailable_image_handler(),
1097            Duration::from_secs(5),
1098        )?;
1099        let subscription2 = aeron.add_subscription(
1100            AERON_IPC_STREAM,
1101            123,
1102            Handlers::no_available_image_handler(),
1103            Handlers::no_unavailable_image_handler(),
1104            Duration::from_secs(5),
1105        )?;
1106
1107        let count1 = Arc::new(AtomicUsize::new(0));
1108        let count2 = Arc::new(AtomicUsize::new(0));
1109
1110        // Publish a single message.
1111        let msg = b"hello multi-subscription";
1112        let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1113        assert!(
1114            result >= msg.len() as i64,
1115            "Message should be sent successfully"
1116        );
1117
1118        let start_time = Instant::now();
1119        // Poll both subscriptions with inline closures until each has received at least one message.
1120        while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
1121            && start_time.elapsed() < Duration::from_secs(5)
1122        {
1123            let _ = subscription1.poll_once(
1124                |_msg, _header| {
1125                    count1.fetch_add(1, Ordering::SeqCst);
1126                },
1127                128,
1128            )?;
1129            let _ = subscription2.poll_once(
1130                |_msg, _header| {
1131                    count2.fetch_add(1, Ordering::SeqCst);
1132                },
1133                128,
1134            )?;
1135        }
1136
1137        assert!(
1138            count1.load(Ordering::SeqCst) >= 1,
1139            "Subscription 1 did not receive the message"
1140        );
1141        assert!(
1142            count2.load(Ordering::SeqCst) >= 1,
1143            "Subscription 2 did not receive the message"
1144        );
1145
1146        drop(subscription2);
1147        drop(subscription1);
1148        drop(publisher);
1149        drop(aeron);
1150        _stop.store(true, Ordering::SeqCst);
1151        let _ = driver_handle.join().unwrap();
1152        error_handler.release();
1153        Ok(())
1154    }
1155
1156    #[test]
1157    #[serial]
1158    pub fn should_be_able_to_drop_after_close_manually_being_closed(
1159    ) -> Result<(), Box<dyn error::Error>> {
1160        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1161
1162        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1163        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1164        media_driver_ctx.set_dir_delete_on_start(true)?;
1165        media_driver_ctx.set_dir(
1166            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1167        )?;
1168        let (_stop, driver_handle) =
1169            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1170
1171        let ctx = AeronContext::new()?;
1172        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1173        let mut error_handler = Handler::leak(AeronErrorHandlerLogger);
1174        ctx.set_error_handler(Some(&error_handler))?;
1175
1176        let aeron = Aeron::new(&ctx)?;
1177        aeron.start()?;
1178
1179        {
1180            let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1181            info!("created publication [sessionId={}]", publisher.session_id());
1182            publisher.close_with_no_args()?;
1183            drop(publisher);
1184        }
1185
1186        {
1187            let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
1188            info!("created publication [sessionId={}]", publisher.session_id());
1189            publisher.close(Handlers::no_notification_handler())?;
1190            drop(publisher);
1191        }
1192
1193        {
1194            let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
1195            publisher.close_once(|| println!("on close"))?;
1196            info!("created publication [sessionId={}]", publisher.session_id());
1197            drop(publisher);
1198        }
1199
1200        drop(aeron);
1201        _stop.store(true, Ordering::SeqCst);
1202        let _ = driver_handle.join().unwrap();
1203        error_handler.release();
1204        Ok(())
1205    }
1206
1207    #[test]
1208    #[serial]
1209    pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
1210        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1211
1212        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1213        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1214        media_driver_ctx.set_dir_delete_on_start(true)?;
1215        media_driver_ctx.set_dir(
1216            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1217        )?;
1218        let (_stop, driver_handle) =
1219            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1220
1221        let ctx = AeronContext::new()?;
1222        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1223        let mut error_handler = Handler::leak(TestErrorCount::default());
1224        ctx.set_error_handler(Some(&error_handler))?;
1225
1226        let aeron = Aeron::new(&ctx)?;
1227        aeron.start()?;
1228        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1229
1230        // Close the publication immediately.
1231        publisher.close(Handlers::no_notification_handler())?;
1232
1233        // Attempt to send a message after the publication is closed.
1234        let result = publisher.offer(
1235            b"should fail",
1236            Handlers::no_reserved_value_supplier_handler(),
1237        );
1238        assert!(
1239            result < 0,
1240            "Offering on a closed publication should return a negative error code"
1241        );
1242
1243        drop(publisher);
1244        drop(aeron);
1245        _stop.store(true, Ordering::SeqCst);
1246        let _ = driver_handle.join().unwrap();
1247        error_handler.release();
1248        Ok(())
1249    }
1250
1251    /// Test sending and receiving an empty (zero-length) message using inline closures with poll_once.
1252    #[test]
1253    #[serial]
1254    pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
1255        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1256
1257        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1258        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1259        media_driver_ctx.set_dir_delete_on_start(true)?;
1260        media_driver_ctx.set_dir(
1261            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1262        )?;
1263        let (_stop, driver_handle) =
1264            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1265
1266        let ctx = AeronContext::new()?;
1267        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1268        let mut error_handler = Handler::leak(TestErrorCount::default());
1269        ctx.set_error_handler(Some(&error_handler))?;
1270
1271        let aeron = Aeron::new(&ctx)?;
1272        aeron.start()?;
1273        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1274        let subscription = aeron.add_subscription(
1275            AERON_IPC_STREAM,
1276            123,
1277            Handlers::no_available_image_handler(),
1278            Handlers::no_unavailable_image_handler(),
1279            Duration::from_secs(5),
1280        )?;
1281
1282        let empty_received = Arc::new(AtomicBool::new(false));
1283        let start_time = Instant::now();
1284
1285        let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
1286        assert!(result > 0);
1287
1288        while !empty_received.load(Ordering::SeqCst)
1289            && start_time.elapsed() < Duration::from_secs(5)
1290        {
1291            let _ = subscription.poll_once(
1292                |msg, _header| {
1293                    if msg.is_empty() {
1294                        empty_received.store(true, Ordering::SeqCst);
1295                    }
1296                },
1297                128,
1298            )?;
1299        }
1300
1301        assert!(
1302            empty_received.load(Ordering::SeqCst),
1303            "Empty message was not received"
1304        );
1305        drop(subscription);
1306        drop(publisher);
1307        drop(aeron);
1308        _stop.store(true, Ordering::SeqCst);
1309        let _ = driver_handle.join().unwrap();
1310        error_handler.release();
1311        Ok(())
1312    }
1313
1314    #[derive(Default, Debug)]
1315    struct MdcTotals {
1316        gap_events: u64,
1317        missing_messages: u64,
1318        received_messages: u64,
1319    }
1320
1321    #[derive(Debug)]
1322    struct MdcWindowStats {
1323        expected_seq: Option<u64>,
1324        gap_events: u64,
1325        missing_messages: u64,
1326        received_messages: u64,
1327        histogram: Histogram<u64>,
1328    }
1329
1330    impl MdcWindowStats {
1331        fn new() -> Result<Self, Box<dyn error::Error>> {
1332            Ok(Self {
1333                expected_seq: None,
1334                gap_events: 0,
1335                missing_messages: 0,
1336                received_messages: 0,
1337                histogram: Histogram::new(3)?,
1338            })
1339        }
1340
1341        fn observe(&mut self, seq: u64, sent_ts_ns: u64) {
1342            self.received_messages += 1;
1343
1344            match self.expected_seq {
1345                None => self.expected_seq = Some(seq.saturating_add(1)),
1346                Some(expected) if seq > expected => {
1347                    self.gap_events += 1;
1348                    self.missing_messages += seq - expected;
1349                    self.expected_seq = Some(seq.saturating_add(1));
1350                }
1351                Some(expected) if seq == expected => {
1352                    self.expected_seq = Some(expected.saturating_add(1));
1353                }
1354                Some(_) => {
1355                    // Ignore out-of-order/late packets for gap counting in this window.
1356                }
1357            }
1358
1359            let now_ns = Aeron::nano_clock().max(0) as u64;
1360            let latency_ns = now_ns.saturating_sub(sent_ts_ns);
1361            let _ = self.histogram.record(latency_ns);
1362        }
1363
1364        fn print_and_reset(
1365            &mut self,
1366            window_number: usize,
1367            interval: Duration,
1368            totals: &mut MdcTotals,
1369        ) {
1370            totals.gap_events += self.gap_events;
1371            totals.missing_messages += self.missing_messages;
1372            totals.received_messages += self.received_messages;
1373
1374            if self.histogram.len() > 0 {
1375                let min_us = self.histogram.min() / 1_000;
1376                let p50_us = self.histogram.value_at_quantile(0.50) / 1_000;
1377                let p99_us = self.histogram.value_at_quantile(0.99) / 1_000;
1378                let max_us = self.histogram.max() / 1_000;
1379                println!(
1380                    "[mdc-window-{window_number}] interval={interval:?} received={} gaps={} missing={} latency_us[min={}, p50={}, p99={}, max={}]",
1381                    self.received_messages, self.gap_events, self.missing_messages, min_us, p50_us, p99_us, max_us,
1382                );
1383            } else {
1384                println!(
1385                    "[mdc-window-{window_number}] interval={interval:?} received=0 gaps=0 missing=0 latency_us[min=n/a, p50=n/a, p99=n/a, max=n/a]"
1386                );
1387            }
1388
1389            self.expected_seq = None;
1390            self.gap_events = 0;
1391            self.missing_messages = 0;
1392            self.received_messages = 0;
1393            self.histogram.reset();
1394        }
1395    }
1396
1397    /// Run with loss profile on macOS:
1398    /// `just mdc-loss-run 120 10 0.10`
1399    ///
1400    /// The recipe handles PF setup and cleanup automatically.
1401    ///
1402    /// Manual cleanup (if needed):
1403    /// `sudo pfctl -a com.apple/rusteron-mdc-loss -F all`
1404    /// `sudo dnctl -q flush`
1405    /// `sudo pfctl -f /etc/pf.conf`
1406    #[test]
1407    #[serial]
1408    #[ignore] // Long-running diagnostics test for manual MDC with rolling latency/gap reports.
1409    pub fn mdc_unreliable_gap_latency_histogram_report() -> Result<(), Box<dyn error::Error>> {
1410        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1411
1412        const STREAM_ID: i32 = 32931;
1413        const CONTROL_PORT: u16 = 32929;
1414        const SUBSCRIBER_PORT: u16 = 32930;
1415        const MESSAGE_LEN: usize = 130;
1416
1417        let report_interval = Duration::from_secs(
1418            std::env::var("RUSTERON_MDC_REPORT_INTERVAL_SECS")
1419                .ok()
1420                .and_then(|value| value.parse::<u64>().ok())
1421                .unwrap_or(10),
1422        );
1423        let test_duration = Duration::from_secs(
1424            std::env::var("RUSTERON_MDC_TEST_DURATION_SECS")
1425                .ok()
1426                .and_then(|value| value.parse::<u64>().ok())
1427                .unwrap_or(300),
1428        );
1429        let mdc_host = std::env::var("RUSTERON_MDC_HOST").unwrap_or("127.0.0.1".to_string());
1430
1431        let publication_channel = format!(
1432            "aeron:udp?control-mode=manual|control={}:{CONTROL_PORT}",
1433            mdc_host
1434        );
1435        // - group=false: do not apply MDC receiver-group semantics.
1436        // - nak-delay=100us: shorten unreliable-stream gap-fill decision latency.
1437        let subscription_channel = format!(
1438            "aeron:udp?endpoint={}:{SUBSCRIBER_PORT}|reliable=false|tether=false|group=false|nak-delay=500us",
1439            mdc_host
1440        );
1441        let destination_uri = format!("aeron:udp?endpoint={}:{SUBSCRIBER_PORT}", mdc_host);
1442
1443        let (media_driver_ctx, stop_driver, driver_handle) = start_media_driver(32930)?;
1444        let aeron_dir = media_driver_ctx.get_dir().to_string();
1445
1446        println!(
1447            "[mdc-start] publication={} subscription={} duration={:?} report_interval={:?}",
1448            publication_channel, subscription_channel, test_duration, report_interval
1449        );
1450
1451        let running = Arc::new(AtomicBool::new(true));
1452
1453        let subscriber_dir = aeron_dir.clone();
1454        let subscriber_channel = subscription_channel.clone();
1455        let subscriber_running = Arc::clone(&running);
1456        let subscriber_thread = std::thread::spawn(move || -> MdcTotals {
1457            let (_ctx, aeron) = create_client_for_dir(&subscriber_dir)
1458                .expect("failed to create subscriber aeron client");
1459
1460            let subscription = aeron
1461                .add_subscription(
1462                    &subscriber_channel.into_c_string(),
1463                    STREAM_ID,
1464                    Handlers::no_available_image_handler(),
1465                    Handlers::no_unavailable_image_handler(),
1466                    Duration::from_secs(5),
1467                )
1468                .expect("failed to create subscriber");
1469
1470            let mut totals = MdcTotals::default();
1471            let mut window_stats = MdcWindowStats::new().expect("failed to create histogram");
1472            let test_start = Instant::now();
1473            let mut window_start = test_start;
1474            let mut window_number = 1usize;
1475
1476            while test_start.elapsed() < test_duration {
1477                let _ = subscription
1478                    .poll_once(
1479                        |msg, _header| {
1480                            if msg.len() < 16 {
1481                                return;
1482                            }
1483
1484                            let seq = u64::from_le_bytes(msg[0..8].try_into().unwrap());
1485                            let sent_ts_ns = u64::from_le_bytes(msg[8..16].try_into().unwrap());
1486                            window_stats.observe(seq, sent_ts_ns);
1487                        },
1488                        10_000,
1489                    )
1490                    .expect("subscriber poll failed");
1491
1492                if window_start.elapsed() >= report_interval {
1493                    window_stats.print_and_reset(window_number, report_interval, &mut totals);
1494                    window_number += 1;
1495                    window_start = Instant::now();
1496                }
1497            }
1498
1499            window_stats.print_and_reset(window_number, report_interval, &mut totals);
1500            subscriber_running.store(false, Ordering::SeqCst);
1501            totals
1502        });
1503
1504        // Ensure subscriber has started before publisher setup.
1505        sleep(Duration::from_millis(250));
1506
1507        let publisher_dir = aeron_dir;
1508        let publisher_channel = publication_channel.clone();
1509        let publisher_destination = destination_uri.clone();
1510        let publisher_running = Arc::clone(&running);
1511        let publisher_thread = std::thread::spawn(move || -> u64 {
1512            let (_ctx, aeron) = create_client_for_dir(&publisher_dir)
1513                .expect("failed to create publisher aeron client");
1514
1515            let publication = aeron
1516                .add_exclusive_publication(
1517                    &publisher_channel.into_c_string(),
1518                    STREAM_ID,
1519                    Duration::from_secs(5),
1520                )
1521                .expect("failed to create publication");
1522
1523            let add_destination =
1524                AeronAsyncDestination::aeron_exclusive_publication_async_add_destination(
1525                    &aeron,
1526                    &publication,
1527                    &publisher_destination.into_c_string(),
1528                )
1529                .expect("failed to add manual MDC destination");
1530
1531            let add_destination_start = Instant::now();
1532            while add_destination
1533                .aeron_exclusive_publication_async_destination_poll()
1534                .expect("destination add poll failed")
1535                == 0
1536            {
1537                assert!(
1538                    add_destination_start.elapsed() <= Duration::from_secs(5),
1539                    "Timed out adding manual MDC destination"
1540                );
1541                sleep(Duration::from_millis(10));
1542            }
1543
1544            let connect_start = Instant::now();
1545            while !publication.is_connected() && connect_start.elapsed() < Duration::from_secs(5) {
1546                sleep(Duration::from_millis(10));
1547            }
1548            assert!(
1549                publication.is_connected(),
1550                "manual MDC publication did not connect to subscriber destination"
1551            );
1552
1553            let mut seq: u64 = 0;
1554            let mut payload = [0u8; MESSAGE_LEN];
1555            while publisher_running.load(Ordering::Acquire) {
1556                payload[0..8].copy_from_slice(&seq.to_le_bytes());
1557                let ts_ns = Aeron::nano_clock().max(0) as u64;
1558                payload[8..16].copy_from_slice(&ts_ns.to_le_bytes());
1559
1560                let result =
1561                    publication.offer(&payload, Handlers::no_reserved_value_supplier_handler());
1562                if result > 0 {
1563                    seq = seq.wrapping_add(1);
1564                }
1565                sleep(Duration::from_millis(1));
1566            }
1567            seq
1568        });
1569
1570        let totals = subscriber_thread.join().unwrap();
1571        running.store(false, Ordering::SeqCst);
1572        let sent_messages = publisher_thread.join().unwrap();
1573        stop_driver.store(true, Ordering::SeqCst);
1574        let _ = driver_handle.join().unwrap();
1575
1576        println!(
1577            "[mdc-summary] sent={} received={} total_gaps={} total_missing={}",
1578            sent_messages, totals.received_messages, totals.gap_events, totals.missing_messages
1579        );
1580
1581        assert!(sent_messages > 0, "publisher failed to send any messages");
1582        assert!(
1583            totals.received_messages > 0,
1584            "subscriber did not receive any messages"
1585        );
1586        Ok(())
1587    }
1588
1589    #[test]
1590    #[serial]
1591    #[ignore] // need to work to get tags working properly, its more of testing issue then tag issue
1592    pub fn tags() -> Result<(), Box<dyn error::Error>> {
1593        rusteron_code_gen::test_logger::init(log::LevelFilter::Debug);
1594
1595        let (md_ctx, stop, md) = start_media_driver(1)?;
1596
1597        let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
1598
1599        info!("creating suscriber 1");
1600        let sub = aeron_sub
1601            .add_subscription(
1602                &"aeron:udp?tags=100".into_c_string(),
1603                123,
1604                Handlers::no_available_image_handler(),
1605                Handlers::no_unavailable_image_handler(),
1606                Duration::from_secs(50),
1607            )
1608            .map_err(|e| {
1609                error!("aeron error={}", Aeron::errmsg());
1610                e
1611            })?;
1612
1613        let ctx = AeronContext::new()?;
1614        ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
1615        let aeron = Aeron::new(&ctx)?;
1616        aeron.start()?;
1617
1618        info!("creating suscriber 2");
1619        let sub2 = aeron_sub.add_subscription(
1620            &"aeron:udp?tags=100".into_c_string(),
1621            123,
1622            Handlers::no_available_image_handler(),
1623            Handlers::no_unavailable_image_handler(),
1624            Duration::from_secs(50),
1625        )?;
1626
1627        let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
1628        info!("creating publisher");
1629        assert!(!aeron_pub.is_closed());
1630        let publisher = aeron_pub
1631            .add_publication(
1632                &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
1633                123,
1634                Duration::from_secs(5),
1635            )
1636            .map_err(|e| {
1637                error!("aeron error={}", Aeron::errmsg());
1638                e
1639            })?;
1640
1641        info!("publishing msg");
1642
1643        loop {
1644            let result = publisher.offer(
1645                "213".as_bytes(),
1646                Handlers::no_reserved_value_supplier_handler(),
1647            );
1648            if result < 0 {
1649                error!(
1650                    "failed to publish {:?}",
1651                    AeronCError::from_code(result as i32)
1652                );
1653            } else {
1654                break;
1655            }
1656        }
1657
1658        sub.poll_once(
1659            |msg, _header| {
1660                println!("Received message: {:?}", msg);
1661            },
1662            128,
1663        )?;
1664        sub2.poll_once(
1665            |msg, _header| {
1666                println!("Received message: {:?}", msg);
1667            },
1668            128,
1669        )?;
1670
1671        stop.store(true, Ordering::SeqCst);
1672
1673        Ok(())
1674    }
1675
1676    fn create_client_for_dir(dir: &str) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1677        info!("creating aeron client [dir={}]", dir);
1678        let ctx = AeronContext::new()?;
1679        ctx.set_dir(&dir.into_c_string())?;
1680        let aeron = Aeron::new(&ctx)?;
1681        aeron.start()?;
1682        Ok((ctx, aeron))
1683    }
1684
1685    fn create_client(
1686        media_driver_ctx: &AeronDriverContext,
1687    ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1688        let dir = media_driver_ctx.get_dir().to_string();
1689        create_client_for_dir(&dir)
1690    }
1691
1692    fn start_media_driver(
1693        instance: u64,
1694    ) -> Result<
1695        (
1696            AeronDriverContext,
1697            Arc<AtomicBool>,
1698            JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
1699        ),
1700        Box<dyn Error>,
1701    > {
1702        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1703        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1704        media_driver_ctx.set_dir_delete_on_start(true)?;
1705        media_driver_ctx.set_dir(
1706            &format!(
1707                "{}{}-{}",
1708                media_driver_ctx.get_dir(),
1709                Aeron::epoch_clock(),
1710                instance
1711            )
1712            .into_c_string(),
1713        )?;
1714        let (stop, driver_handle) =
1715            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1716        Ok((media_driver_ctx, stop, driver_handle))
1717    }
1718
1719    #[doc = include_str!("../../README.md")]
1720    mod readme_tests {}
1721}