rusteron_client/
lib.rs

1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8//! # Features
9//!
10//! - **`static`**: When enabled, this feature statically links the Aeron C code.
11//!   By default, the library uses dynamic linking to the Aeron C libraries.
12//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
13//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
14//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
15//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
16
17pub mod bindings {
18    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22
23include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
24include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
25
26#[cfg(test)]
27mod tests {
28    use super::*;
29    use crate::test_alloc::current_allocs;
30    use log::{error, info};
31    use rusteron_media_driver::AeronDriverContext;
32    use serial_test::serial;
33    use std::error;
34    use std::error::Error;
35    use std::io::Write;
36    use std::os::raw::c_int;
37    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
38    use std::sync::Arc;
39    use std::thread::{sleep, JoinHandle};
40    use std::time::{Duration, Instant};
41
42    #[derive(Default, Debug)]
43    struct ErrorCount {
44        error_count: usize,
45    }
46
47    impl AeronErrorHandlerCallback for ErrorCount {
48        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
49            error!("Aeron error {}: {}", error_code, msg);
50            self.error_count += 1;
51        }
52    }
53
54    #[test]
55    #[serial]
56    fn version_check() -> Result<(), Box<dyn error::Error>> {
57        unsafe {
58            aeron_randomised_int32();
59        }
60        let alloc_count = current_allocs();
61
62        {
63            let major = unsafe { crate::aeron_version_major() };
64            let minor = unsafe { crate::aeron_version_minor() };
65            let patch = unsafe { crate::aeron_version_patch() };
66
67            let cargo_version = "1.48.6";
68            let aeron_version = format!("{}.{}.{}", major, minor, patch);
69            assert_eq!(aeron_version, cargo_version);
70
71            let ctx = AeronContext::new()?;
72            let error_count = 1;
73            let mut handler = Handler::leak(ErrorCount::default());
74            ctx.set_error_handler(Some(&handler))?;
75
76            assert!(Aeron::epoch_clock() > 0);
77            drop(ctx);
78            assert!(handler.should_drop);
79            handler.release();
80            assert!(!handler.should_drop);
81            drop(handler);
82        }
83
84        assert!(
85            current_allocs() <= alloc_count,
86            "allocations {} > {alloc_count}",
87            current_allocs()
88        );
89
90        Ok(())
91    }
92
93    #[test]
94    #[serial]
95    pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
96        let _ = env_logger::Builder::new()
97            .is_test(true)
98            .filter_level(log::LevelFilter::Info)
99            .try_init();
100        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
101        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
102        media_driver_ctx.set_dir_delete_on_start(true)?;
103        media_driver_ctx.set_dir(
104            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
105        )?;
106        let (stop, driver_handle) =
107            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
108
109        let ctx = AeronContext::new()?;
110        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
111        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
112        let error_count = 1;
113        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
114        ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
115        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
116        ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
117        ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
118        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
119        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
120        ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
121
122        info!("creating client [simple_large_send test]");
123        let aeron = Aeron::new(&ctx)?;
124        info!("starting client");
125
126        aeron.start()?;
127        info!("client started");
128        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
129        info!("created publisher");
130
131        assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
132        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
133        AeronCncMetadata::read_from_file(&cstr, |cnc| {
134            assert!(cnc.pid > 0);
135        })?;
136        assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
137        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
138        for _ in 0..50 {
139            AeronCnc::read_on_partial_stack(&cstr, |cnc| {
140                assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
141            })?;
142        }
143
144        let subscription = aeron.add_subscription(
145            AERON_IPC_STREAM,
146            123,
147            Handlers::no_available_image_handler(),
148            Handlers::no_unavailable_image_handler(),
149            Duration::from_secs(5),
150        )?;
151        info!("created subscription");
152
153        subscription
154            .poll_once(|msg, header| println!("foo"), 1024)
155            .unwrap();
156
157        // pick a large enough size to confirm fragement assembler is working
158        let string_len = media_driver_ctx.ipc_mtu_length * 100;
159        info!("string length: {}", string_len);
160
161        let publisher_handler = {
162            let stop = stop.clone();
163            std::thread::spawn(move || {
164                let binding = "1".repeat(string_len);
165                let large_msg = binding.as_bytes();
166                loop {
167                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
168                        break;
169                    }
170                    let result =
171                        publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
172
173                    assert_eq!(123, publisher.get_constants().unwrap().stream_id);
174
175                    if result < large_msg.len() as i64 {
176                        let error = AeronCError::from_code(result as i32);
177                        match error.kind() {
178                            AeronErrorType::PublicationBackPressured
179                            | AeronErrorType::PublicationAdminAction => {
180                                // ignore
181                            }
182                            _ => {
183                                error!(
184                                    "ERROR: failed to send message {:?}",
185                                    AeronCError::from_code(result as i32)
186                                );
187                            }
188                        }
189                        sleep(Duration::from_millis(500));
190                    }
191                }
192                info!("stopping publisher thread");
193            })
194        };
195
196        let mut assembler = AeronFragmentClosureAssembler::new()?;
197
198        struct Context {
199            count: Arc<AtomicUsize>,
200            stop: Arc<AtomicBool>,
201            string_len: usize,
202        }
203
204        let count = Arc::new(AtomicUsize::new(0usize));
205        let mut context = Context {
206            count: count.clone(),
207            stop: stop.clone(),
208            string_len,
209        };
210
211        // Start the timer
212        let start_time = Instant::now();
213
214        loop {
215            if start_time.elapsed() > Duration::from_secs(30) {
216                info!("Failed: exceeded 30-second timeout");
217                return Err(Box::new(std::io::Error::new(
218                    std::io::ErrorKind::TimedOut,
219                    "Timeout exceeded",
220                )));
221            }
222            let c = count.load(Ordering::SeqCst);
223            if c > 100 {
224                break;
225            }
226
227            fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
228                ctx.count.fetch_add(1, Ordering::SeqCst);
229
230                let values = header.get_values().unwrap();
231                assert_ne!(values.frame.session_id, 0);
232
233                if buffer.len() != ctx.string_len {
234                    ctx.stop.store(true, Ordering::SeqCst);
235                    error!(
236                        "ERROR: message was {} but was expecting {} [header={:?}]",
237                        buffer.len(),
238                        ctx.string_len,
239                        header
240                    );
241                    sleep(Duration::from_secs(1));
242                }
243
244                assert_eq!(buffer.len(), ctx.string_len);
245                assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
246            }
247
248            subscription.poll(assembler.process(&mut context, process_msg), 128)?;
249            assert_eq!(123, subscription.get_constants().unwrap().stream_id);
250        }
251
252        subscription.close(Handlers::no_notification_handler())?;
253
254        info!("stopping client");
255        stop.store(true, Ordering::SeqCst);
256
257        let _ = publisher_handler.join().unwrap();
258        let _ = driver_handle.join().unwrap();
259
260        let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
261        cnc.counters_reader().foreach_counter_once(
262            |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
263                println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
264                AeronSystemCounterType::try_from(type_id));
265            },
266        );
267        cnc.error_log_read_once(| observation_count: i32,
268                                     first_observation_timestamp: i64,
269                                     last_observation_timestamp: i64,
270                                     error: &str| {
271            println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
272        }, 0);
273        cnc.loss_reporter_read_once(|    observation_count: i64,
274                                    total_bytes_lost: i64,
275                                    first_observation_timestamp: i64,
276                                    last_observation_timestamp: i64,
277                                    session_id: i32,
278                                    stream_id: i32,
279                                    channel: &str,
280                                    source: &str,| {
281            println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
282        })?;
283
284        Ok(())
285    }
286
287    #[test]
288    #[serial]
289    pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
290        let _ = env_logger::Builder::new()
291            .is_test(true)
292            .filter_level(log::LevelFilter::Info)
293            .try_init();
294        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
295        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
296        media_driver_ctx.set_dir_delete_on_start(true)?;
297        media_driver_ctx.set_dir(
298            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
299        )?;
300        let (stop, driver_handle) =
301            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
302
303        let ctx = AeronContext::new()?;
304        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
305        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
306        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
307
308        info!("creating client [try_claim test]");
309        let aeron = Aeron::new(&ctx)?;
310        info!("starting client");
311
312        aeron.start()?;
313        info!("client started");
314        const STREAM_ID: i32 = 123;
315        let publisher =
316            aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
317        info!("created publisher");
318
319        let subscription = aeron.add_subscription(
320            AERON_IPC_STREAM,
321            STREAM_ID,
322            Handlers::no_available_image_handler(),
323            Handlers::no_unavailable_image_handler(),
324            Duration::from_secs(5),
325        )?;
326        info!("created subscription");
327
328        // pick a large enough size to confirm fragement assembler is working
329        let string_len = 156;
330        info!("string length: {}", string_len);
331
332        let publisher_handler = {
333            let stop = stop.clone();
334            std::thread::spawn(move || {
335                let binding = "1".repeat(string_len);
336                let msg = binding.as_bytes();
337                let buffer = AeronBufferClaim::default();
338                loop {
339                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
340                        break;
341                    }
342
343                    let result = publisher.try_claim(string_len, &buffer);
344
345                    if result < msg.len() as i64 {
346                        error!(
347                            "ERROR: failed to send message {:?}",
348                            AeronCError::from_code(result as i32)
349                        );
350                    } else {
351                        buffer.data().write_all(&msg).unwrap();
352                        buffer.commit().unwrap();
353                    }
354                }
355                info!("stopping publisher thread");
356            })
357        };
358
359        let count = Arc::new(AtomicUsize::new(0usize));
360        let count_copy = Arc::clone(&count);
361        let stop2 = stop.clone();
362
363        struct FragmentHandler {
364            count_copy: Arc<AtomicUsize>,
365            stop2: Arc<AtomicBool>,
366            string_len: usize,
367        }
368
369        impl AeronFragmentHandlerCallback for FragmentHandler {
370            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
371                assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
372                let header = header.get_values().unwrap();
373                let frame = header.frame();
374                let stream_id = frame.stream_id();
375                assert_eq!(STREAM_ID, stream_id);
376
377                self.count_copy.fetch_add(1, Ordering::SeqCst);
378
379                if buffer.len() != self.string_len {
380                    self.stop2.store(true, Ordering::SeqCst);
381                    error!(
382                        "ERROR: message was {} but was expecting {} [header={:?}]",
383                        buffer.len(),
384                        self.string_len,
385                        header
386                    );
387                    sleep(Duration::from_secs(1));
388                }
389
390                assert_eq!(buffer.len(), self.string_len);
391                assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
392            }
393        }
394
395        let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
396            count_copy,
397            stop2,
398            string_len,
399        })?;
400        let start_time = Instant::now();
401
402        loop {
403            if start_time.elapsed() > Duration::from_secs(30) {
404                info!("Failed: exceeded 30-second timeout");
405                return Err(Box::new(std::io::Error::new(
406                    std::io::ErrorKind::TimedOut,
407                    "Timeout exceeded",
408                )));
409            }
410            let c = count.load(Ordering::SeqCst);
411            if c > 100 {
412                break;
413            }
414            subscription.poll(Some(&closure), 128)?;
415        }
416
417        info!("stopping client");
418
419        stop.store(true, Ordering::SeqCst);
420
421        let _ = publisher_handler.join().unwrap();
422        let _ = driver_handle.join().unwrap();
423        Ok(())
424    }
425
426    #[test]
427    #[serial]
428    pub fn counters() -> Result<(), Box<dyn error::Error>> {
429        let _ = env_logger::Builder::new()
430            .is_test(true)
431            .filter_level(log::LevelFilter::Info)
432            .try_init();
433        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
434        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
435        media_driver_ctx.set_dir_delete_on_start(true)?;
436        media_driver_ctx.set_dir(
437            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
438        )?;
439        let (stop, driver_handle) =
440            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
441
442        let ctx = AeronContext::new()?;
443        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
444        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
445        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
446        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
447
448        struct AvailableCounterHandler {
449            found_counter: bool,
450        }
451
452        impl AeronAvailableCounterCallback for AvailableCounterHandler {
453            fn handle_aeron_on_available_counter(
454                &mut self,
455                counters_reader: AeronCountersReader,
456                registration_id: i64,
457                counter_id: i32,
458            ) -> () {
459                info!(
460            "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
461            String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
462            counters_reader.get_counter_label(counter_id, 1000),
463            counters_reader.addr(counter_id)
464        );
465
466                assert_eq!(
467                    counters_reader.counter_registration_id(counter_id).unwrap(),
468                    registration_id
469                );
470
471                if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
472                    if label == "label_buffer" {
473                        self.found_counter = true;
474                        assert_eq!(
475                            &counters_reader.get_counter_key(counter_id).unwrap(),
476                            "key".as_bytes()
477                        );
478                    }
479                }
480            }
481        }
482
483        let handler = &Handler::leak(AvailableCounterHandler {
484            found_counter: false,
485        });
486        ctx.set_on_available_counter(Some(handler))?;
487
488        info!("creating client");
489        let aeron = Aeron::new(&ctx)?;
490        info!("starting client");
491
492        aeron.start()?;
493        info!("client started [counters test]");
494
495        let counter = aeron.add_counter(
496            123,
497            "key".as_bytes(),
498            "label_buffer",
499            Duration::from_secs(5),
500        )?;
501        let constants = counter.get_constants()?;
502        let counter_id = constants.counter_id;
503
504        let publisher_handler = {
505            let stop = stop.clone();
506            let counter = counter.clone();
507            std::thread::spawn(move || {
508                for _ in 0..150 {
509                    if stop.load(Ordering::Acquire) || counter.is_closed() {
510                        break;
511                    }
512                    counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
513                }
514                info!("stopping publisher thread");
515            })
516        };
517
518        let now = Instant::now();
519        while counter.addr_atomic().load(Ordering::SeqCst) < 100
520            && now.elapsed() < Duration::from_secs(10)
521        {
522            sleep(Duration::from_micros(10));
523        }
524
525        assert!(now.elapsed() < Duration::from_secs(10));
526
527        info!(
528            "counter is {}",
529            counter.addr_atomic().load(Ordering::SeqCst)
530        );
531
532        info!("stopping client");
533
534        #[cfg(not(target_os = "windows"))] // not sure why windows version doesn't fire event
535        assert!(handler.found_counter);
536
537        stop.store(true, Ordering::SeqCst);
538
539        let reader = aeron.counters_reader();
540        assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
541        assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
542        let buffers = AeronCountersReaderBuffers::default();
543        reader.get_buffers(&buffers)?;
544
545        let _ = publisher_handler.join().unwrap();
546        let _ = driver_handle.join().unwrap();
547        Ok(())
548    }
549
550    /// A simple error counter for testing error callback invocation.
551    #[derive(Default, Debug)]
552    struct TestErrorCount {
553        pub error_count: usize,
554    }
555
556    impl Drop for TestErrorCount {
557        fn drop(&mut self) {
558            info!("TestErrorCount dropped with {} errors", self.error_count);
559        }
560    }
561
562    impl AeronErrorHandlerCallback for TestErrorCount {
563        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
564            error!("Aeron error {}: {}", error_code, msg);
565            self.error_count += 1;
566        }
567    }
568
569    #[test]
570    #[serial]
571    pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
572        let _ = env_logger::Builder::new()
573            .is_test(true)
574            .filter_level(log::LevelFilter::Info)
575            .try_init();
576
577        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
578        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
579        media_driver_ctx.set_dir_delete_on_start(true)?;
580        media_driver_ctx.set_dir(
581            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
582        )?;
583        let (stop, driver_handle) =
584            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
585
586        let ctx = AeronContext::new()?;
587        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
588        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
589
590        let aeron = Aeron::new(&ctx)?;
591        aeron.start()?;
592
593        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
594        let subscription = aeron.add_subscription(
595            AERON_IPC_STREAM,
596            123,
597            Handlers::no_available_image_handler(),
598            Handlers::no_unavailable_image_handler(),
599            Duration::from_secs(5),
600        )?;
601
602        let count = Arc::new(AtomicUsize::new(0));
603        let start_time = Instant::now();
604
605        // Spawn a publisher thread that repeatedly sends "test" messages.
606        let publisher_thread = {
607            let stop = stop.clone();
608            std::thread::spawn(move || {
609                while !stop.load(Ordering::Acquire) {
610                    let msg = b"test";
611                    let result =
612                        publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
613                    // If backpressure is encountered, sleep a bit.
614                    if result == AeronErrorType::PublicationBackPressured.code() as i64 {
615                        sleep(Duration::from_millis(50));
616                    }
617                }
618            })
619        };
620
621        // Poll using the inline closure via poll_once until we receive at least 50 messages.
622        while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
623            let _ = subscription.poll_once(
624                |_msg, _header| {
625                    count.fetch_add(1, Ordering::SeqCst);
626                },
627                128,
628            )?;
629        }
630
631        stop.store(true, Ordering::SeqCst);
632        publisher_thread.join().unwrap();
633        let _ = driver_handle.join().unwrap();
634
635        assert!(
636            count.load(Ordering::SeqCst) >= 50,
637            "Expected at least 50 messages received"
638        );
639        Ok(())
640    }
641
642    #[test]
643    #[serial]
644    pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
645        let _ = env_logger::Builder::new()
646            .is_test(true)
647            .filter_level(log::LevelFilter::Info)
648            .try_init();
649
650        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
651        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
652        media_driver_ctx.set_dir_delete_on_start(true)?;
653        media_driver_ctx.set_dir(
654            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
655        )?;
656        let (_stop, driver_handle) =
657            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
658
659        let ctx = AeronContext::new()?;
660        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
661        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
662
663        let aeron = Aeron::new(&ctx)?;
664        aeron.start()?;
665        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
666
667        // Create two subscriptions on the same channel.
668        let subscription1 = aeron.add_subscription(
669            AERON_IPC_STREAM,
670            123,
671            Handlers::no_available_image_handler(),
672            Handlers::no_unavailable_image_handler(),
673            Duration::from_secs(5),
674        )?;
675        let subscription2 = aeron.add_subscription(
676            AERON_IPC_STREAM,
677            123,
678            Handlers::no_available_image_handler(),
679            Handlers::no_unavailable_image_handler(),
680            Duration::from_secs(5),
681        )?;
682
683        let count1 = Arc::new(AtomicUsize::new(0));
684        let count2 = Arc::new(AtomicUsize::new(0));
685
686        // Publish a single message.
687        let msg = b"hello multi-subscription";
688        let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
689        assert!(
690            result >= msg.len() as i64,
691            "Message should be sent successfully"
692        );
693
694        let start_time = Instant::now();
695        // Poll both subscriptions with inline closures until each has received at least one message.
696        while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
697            && start_time.elapsed() < Duration::from_secs(5)
698        {
699            let _ = subscription1.poll_once(
700                |_msg, _header| {
701                    count1.fetch_add(1, Ordering::SeqCst);
702                },
703                128,
704            )?;
705            let _ = subscription2.poll_once(
706                |_msg, _header| {
707                    count2.fetch_add(1, Ordering::SeqCst);
708                },
709                128,
710            )?;
711        }
712
713        assert!(
714            count1.load(Ordering::SeqCst) >= 1,
715            "Subscription 1 did not receive the message"
716        );
717        assert!(
718            count2.load(Ordering::SeqCst) >= 1,
719            "Subscription 2 did not receive the message"
720        );
721
722        _stop.store(true, Ordering::SeqCst);
723        let _ = driver_handle.join().unwrap();
724        Ok(())
725    }
726
727    #[test]
728    #[serial]
729    pub fn should_be_able_to_drop_after_close_manually_being_closed(
730    ) -> Result<(), Box<dyn error::Error>> {
731        let _ = env_logger::Builder::new()
732            .is_test(true)
733            .filter_level(log::LevelFilter::Info)
734            .try_init();
735
736        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
737        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
738        media_driver_ctx.set_dir_delete_on_start(true)?;
739        media_driver_ctx.set_dir(
740            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
741        )?;
742        let (_stop, driver_handle) =
743            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
744
745        let ctx = AeronContext::new()?;
746        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
747        ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
748
749        let aeron = Aeron::new(&ctx)?;
750        aeron.start()?;
751
752        {
753            let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
754            info!("created publication [sessionId={}]", publisher.session_id());
755            publisher.close_with_no_args()?;
756            drop(publisher);
757        }
758
759        {
760            let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
761            info!("created publication [sessionId={}]", publisher.session_id());
762            publisher.close(Handlers::no_notification_handler())?;
763            drop(publisher);
764        }
765
766        {
767            let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
768            publisher.close_once(|| println!("on close"))?;
769            info!("created publication [sessionId={}]", publisher.session_id());
770            drop(publisher);
771        }
772
773        Ok(())
774    }
775
776    #[test]
777    #[serial]
778    pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
779        let _ = env_logger::Builder::new()
780            .is_test(true)
781            .filter_level(log::LevelFilter::Info)
782            .try_init();
783
784        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
785        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
786        media_driver_ctx.set_dir_delete_on_start(true)?;
787        media_driver_ctx.set_dir(
788            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
789        )?;
790        let (_stop, driver_handle) =
791            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
792
793        let ctx = AeronContext::new()?;
794        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
795        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
796
797        let aeron = Aeron::new(&ctx)?;
798        aeron.start()?;
799        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
800
801        // Close the publication immediately.
802        publisher.close(Handlers::no_notification_handler())?;
803
804        // Attempt to send a message after the publication is closed.
805        let result = publisher.offer(
806            b"should fail",
807            Handlers::no_reserved_value_supplier_handler(),
808        );
809        assert!(
810            result < 0,
811            "Offering on a closed publication should return a negative error code"
812        );
813
814        _stop.store(true, Ordering::SeqCst);
815        let _ = driver_handle.join().unwrap();
816        Ok(())
817    }
818
819    /// Test sending and receiving an empty (zero-length) message using inline closures with poll_once.
820    #[test]
821    #[serial]
822    pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
823        let _ = env_logger::Builder::new()
824            .is_test(true)
825            .filter_level(log::LevelFilter::Info)
826            .try_init();
827
828        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
829        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
830        media_driver_ctx.set_dir_delete_on_start(true)?;
831        media_driver_ctx.set_dir(
832            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
833        )?;
834        let (_stop, driver_handle) =
835            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
836
837        let ctx = AeronContext::new()?;
838        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
839        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
840
841        let aeron = Aeron::new(&ctx)?;
842        aeron.start()?;
843        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
844        let subscription = aeron.add_subscription(
845            AERON_IPC_STREAM,
846            123,
847            Handlers::no_available_image_handler(),
848            Handlers::no_unavailable_image_handler(),
849            Duration::from_secs(5),
850        )?;
851
852        let empty_received = Arc::new(AtomicBool::new(false));
853        let start_time = Instant::now();
854
855        let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
856        assert!(result > 0);
857
858        while !empty_received.load(Ordering::SeqCst)
859            && start_time.elapsed() < Duration::from_secs(5)
860        {
861            let _ = subscription.poll_once(
862                |msg, _header| {
863                    if msg.is_empty() {
864                        empty_received.store(true, Ordering::SeqCst);
865                    }
866                },
867                128,
868            )?;
869        }
870
871        assert!(
872            empty_received.load(Ordering::SeqCst),
873            "Empty message was not received"
874        );
875        _stop.store(true, Ordering::SeqCst);
876        let _ = driver_handle.join().unwrap();
877        Ok(())
878    }
879
880    #[test]
881    #[serial]
882    #[ignore] // need to work to get tags working properly, its more of testing issue then tag issue
883    pub fn tags() -> Result<(), Box<dyn error::Error>> {
884        let _ = env_logger::Builder::new()
885            .is_test(true)
886            .filter_level(log::LevelFilter::Debug)
887            .try_init();
888
889        let (md_ctx, stop, md) = start_media_driver(1)?;
890
891        let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
892
893        info!("creating suscriber 1");
894        let sub = aeron_sub
895            .add_subscription(
896                &"aeron:udp?tags=100".into_c_string(),
897                123,
898                Handlers::no_available_image_handler(),
899                Handlers::no_unavailable_image_handler(),
900                Duration::from_secs(50),
901            )
902            .map_err(|e| {
903                error!("aeron error={}", Aeron::errmsg());
904                e
905            })?;
906
907        let ctx = AeronContext::new()?;
908        ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
909        let aeron = Aeron::new(&ctx)?;
910        aeron.start()?;
911
912        info!("creating suscriber 2");
913        let sub2 = aeron_sub.add_subscription(
914            &"aeron:udp?tags=100".into_c_string(),
915            123,
916            Handlers::no_available_image_handler(),
917            Handlers::no_unavailable_image_handler(),
918            Duration::from_secs(50),
919        )?;
920
921        let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
922        info!("creating publisher");
923        assert!(!aeron_pub.is_closed());
924        let publisher = aeron_pub
925            .add_publication(
926                &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
927                123,
928                Duration::from_secs(5),
929            )
930            .map_err(|e| {
931                error!("aeron error={}", Aeron::errmsg());
932                e
933            })?;
934
935        info!("publishing msg");
936
937        loop {
938            let result = publisher.offer(
939                "213".as_bytes(),
940                Handlers::no_reserved_value_supplier_handler(),
941            );
942            if result < 0 {
943                error!(
944                    "failed to publish {:?}",
945                    AeronCError::from_code(result as i32)
946                );
947            } else {
948                break;
949            }
950        }
951
952        sub.poll_once(
953            |msg, _header| {
954                println!("Received message: {:?}", msg);
955            },
956            128,
957        )?;
958        sub2.poll_once(
959            |msg, _header| {
960                println!("Received message: {:?}", msg);
961            },
962            128,
963        )?;
964
965        stop.store(true, Ordering::SeqCst);
966
967        Ok(())
968    }
969
970    fn create_client(
971        media_driver_ctx: &AeronDriverContext,
972    ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
973        let dir = media_driver_ctx.get_dir();
974        info!("creating aeron client [dir={}]", dir);
975        let ctx = AeronContext::new()?;
976        ctx.set_dir(&dir.into_c_string())?;
977        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
978        let aeron = Aeron::new(&ctx)?;
979        aeron.start()?;
980        Ok((ctx, aeron))
981    }
982
983    fn start_media_driver(
984        instance: u64,
985    ) -> Result<
986        (
987            AeronDriverContext,
988            Arc<AtomicBool>,
989            JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
990        ),
991        Box<dyn Error>,
992    > {
993        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
994        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
995        media_driver_ctx.set_dir_delete_on_start(true)?;
996        media_driver_ctx.set_dir(
997            &format!(
998                "{}{}-{}",
999                media_driver_ctx.get_dir(),
1000                Aeron::epoch_clock(),
1001                instance
1002            )
1003            .into_c_string(),
1004        )?;
1005        let (stop, driver_handle) =
1006            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1007        Ok((media_driver_ctx, stop, driver_handle))
1008    }
1009
1010    #[doc = include_str!("../../README.md")]
1011    mod readme_tests {}
1012}