rusteron_client/
lib.rs

1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8//! # Features
9//!
10//! - **`static`**: When enabled, this feature statically links the Aeron C code.
11//!   By default, the library uses dynamic linking to the Aeron C libraries.
12//! - **`backtrace`** - When enabled will log a backtrace for each AeronCError
13//! - **`extra-logging`** - When enabled will log when resource is created and destroyed. useful if your seeing a segfault due to a resource being closed
14//! - **`precompile`** - When enabled will use precompiled c code instead of requiring cmake and java to me installed
15
16pub mod bindings {
17    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19
20use bindings::*;
21
22include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
23include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
24
25#[cfg(test)]
26mod tests {
27    use super::*;
28    use crate::test_alloc::current_allocs;
29    use log::{error, info};
30    use rusteron_media_driver::AeronDriverContext;
31    use serial_test::serial;
32    use std::error;
33    use std::error::Error;
34    use std::io::Write;
35    use std::os::raw::c_int;
36    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
37    use std::sync::Arc;
38    use std::thread::{sleep, JoinHandle};
39    use std::time::{Duration, Instant};
40
41    #[derive(Default, Debug)]
42    struct ErrorCount {
43        error_count: usize,
44    }
45
46    impl AeronErrorHandlerCallback for ErrorCount {
47        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
48            error!("Aeron error {}: {}", error_code, msg);
49            self.error_count += 1;
50        }
51    }
52
53    #[test]
54    #[serial]
55    fn version_check() -> Result<(), Box<dyn error::Error>> {
56        unsafe {
57            aeron_randomised_int32();
58        }
59        let alloc_count = current_allocs();
60
61        {
62            let major = unsafe { crate::aeron_version_major() };
63            let minor = unsafe { crate::aeron_version_minor() };
64            let patch = unsafe { crate::aeron_version_patch() };
65
66            let cargo_version = "1.48.5";
67            let aeron_version = format!("{}.{}.{}", major, minor, patch);
68            assert_eq!(aeron_version, cargo_version);
69
70            let ctx = AeronContext::new()?;
71            let error_count = 1;
72            let mut handler = Handler::leak(ErrorCount::default());
73            ctx.set_error_handler(Some(&handler))?;
74
75            assert!(Aeron::epoch_clock() > 0);
76            drop(ctx);
77            assert!(handler.should_drop);
78            handler.release();
79            assert!(!handler.should_drop);
80            drop(handler);
81        }
82
83        assert!(
84            current_allocs() <= alloc_count,
85            "allocations {} > {alloc_count}",
86            current_allocs()
87        );
88
89        Ok(())
90    }
91
92    #[test]
93    #[serial]
94    pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
95        let _ = env_logger::Builder::new()
96            .is_test(true)
97            .filter_level(log::LevelFilter::Info)
98            .try_init();
99        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
100        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
101        media_driver_ctx.set_dir_delete_on_start(true)?;
102        media_driver_ctx.set_dir(
103            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
104        )?;
105        let (stop, driver_handle) =
106            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
107
108        let ctx = AeronContext::new()?;
109        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
110        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
111        let error_count = 1;
112        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
113        ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
114        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
115        ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
116        ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
117        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
118        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
119        ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
120
121        info!("creating client [simple_large_send test]");
122        let aeron = Aeron::new(&ctx)?;
123        info!("starting client");
124
125        aeron.start()?;
126        info!("client started");
127        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
128        info!("created publisher");
129
130        assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
131        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
132        AeronCncMetadata::read_from_file(&cstr, |cnc| {
133            assert!(cnc.pid > 0);
134        })?;
135        assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
136        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
137        for _ in 0..50 {
138            AeronCnc::read_on_partial_stack(&cstr, |cnc| {
139                assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
140            })?;
141        }
142
143        let subscription = aeron.add_subscription(
144            AERON_IPC_STREAM,
145            123,
146            Handlers::no_available_image_handler(),
147            Handlers::no_unavailable_image_handler(),
148            Duration::from_secs(5),
149        )?;
150        info!("created subscription");
151
152        subscription
153            .poll_once(|msg, header| println!("foo"), 1024)
154            .unwrap();
155
156        // pick a large enough size to confirm fragement assembler is working
157        let string_len = media_driver_ctx.ipc_mtu_length * 100;
158        info!("string length: {}", string_len);
159
160        let publisher_handler = {
161            let stop = stop.clone();
162            std::thread::spawn(move || {
163                let binding = "1".repeat(string_len);
164                let large_msg = binding.as_bytes();
165                loop {
166                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
167                        break;
168                    }
169                    let result =
170                        publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
171
172                    assert_eq!(123, publisher.get_constants().unwrap().stream_id);
173
174                    if result < large_msg.len() as i64 {
175                        let error = AeronCError::from_code(result as i32);
176                        match error.kind() {
177                            AeronErrorType::PublicationBackPressured
178                            | AeronErrorType::PublicationAdminAction => {
179                                // ignore
180                            }
181                            _ => {
182                                error!(
183                                    "ERROR: failed to send message {:?}",
184                                    AeronCError::from_code(result as i32)
185                                );
186                            }
187                        }
188                        sleep(Duration::from_millis(500));
189                    }
190                }
191                info!("stopping publisher thread");
192            })
193        };
194
195        let mut assembler = AeronFragmentClosureAssembler::new()?;
196
197        struct Context {
198            count: Arc<AtomicUsize>,
199            stop: Arc<AtomicBool>,
200            string_len: usize,
201        }
202
203        let count = Arc::new(AtomicUsize::new(0usize));
204        let mut context = Context {
205            count: count.clone(),
206            stop: stop.clone(),
207            string_len,
208        };
209
210        // Start the timer
211        let start_time = Instant::now();
212
213        loop {
214            if start_time.elapsed() > Duration::from_secs(30) {
215                info!("Failed: exceeded 30-second timeout");
216                return Err(Box::new(std::io::Error::new(
217                    std::io::ErrorKind::TimedOut,
218                    "Timeout exceeded",
219                )));
220            }
221            let c = count.load(Ordering::SeqCst);
222            if c > 100 {
223                break;
224            }
225
226            fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
227                ctx.count.fetch_add(1, Ordering::SeqCst);
228
229                let values = header.get_values().unwrap();
230                assert_ne!(values.frame.session_id, 0);
231
232                if buffer.len() != ctx.string_len {
233                    ctx.stop.store(true, Ordering::SeqCst);
234                    error!(
235                        "ERROR: message was {} but was expecting {} [header={:?}]",
236                        buffer.len(),
237                        ctx.string_len,
238                        header
239                    );
240                    sleep(Duration::from_secs(1));
241                }
242
243                assert_eq!(buffer.len(), ctx.string_len);
244                assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
245            }
246
247            subscription.poll(assembler.process(&mut context, process_msg), 128)?;
248            assert_eq!(123, subscription.get_constants().unwrap().stream_id);
249        }
250
251        subscription.close(Handlers::no_notification_handler())?;
252
253        info!("stopping client");
254        stop.store(true, Ordering::SeqCst);
255
256        let _ = publisher_handler.join().unwrap();
257        let _ = driver_handle.join().unwrap();
258
259        let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
260        cnc.counters_reader().foreach_counter_once(
261            |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
262                println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
263                AeronSystemCounterType::try_from(type_id));
264            },
265        );
266        cnc.error_log_read_once(| observation_count: i32,
267                                     first_observation_timestamp: i64,
268                                     last_observation_timestamp: i64,
269                                     error: &str| {
270            println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
271        }, 0);
272        cnc.loss_reporter_read_once(|    observation_count: i64,
273                                    total_bytes_lost: i64,
274                                    first_observation_timestamp: i64,
275                                    last_observation_timestamp: i64,
276                                    session_id: i32,
277                                    stream_id: i32,
278                                    channel: &str,
279                                    source: &str,| {
280            println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
281        })?;
282
283        Ok(())
284    }
285
286    #[test]
287    #[serial]
288    pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
289        let _ = env_logger::Builder::new()
290            .is_test(true)
291            .filter_level(log::LevelFilter::Info)
292            .try_init();
293        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
294        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
295        media_driver_ctx.set_dir_delete_on_start(true)?;
296        media_driver_ctx.set_dir(
297            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
298        )?;
299        let (stop, driver_handle) =
300            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
301
302        let ctx = AeronContext::new()?;
303        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
304        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
305        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
306
307        info!("creating client [try_claim test]");
308        let aeron = Aeron::new(&ctx)?;
309        info!("starting client");
310
311        aeron.start()?;
312        info!("client started");
313        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
314        info!("created publisher");
315
316        let subscription = aeron.add_subscription(
317            AERON_IPC_STREAM,
318            123,
319            Handlers::no_available_image_handler(),
320            Handlers::no_unavailable_image_handler(),
321            Duration::from_secs(5),
322        )?;
323        info!("created subscription");
324
325        // pick a large enough size to confirm fragement assembler is working
326        let string_len = 156;
327        info!("string length: {}", string_len);
328
329        let publisher_handler = {
330            let stop = stop.clone();
331            std::thread::spawn(move || {
332                let binding = "1".repeat(string_len);
333                let msg = binding.as_bytes();
334                let buffer = AeronBufferClaim::default();
335                loop {
336                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
337                        break;
338                    }
339
340                    let result = publisher.try_claim(string_len, &buffer);
341
342                    if result < msg.len() as i64 {
343                        error!(
344                            "ERROR: failed to send message {:?}",
345                            AeronCError::from_code(result as i32)
346                        );
347                    } else {
348                        buffer.data().write_all(&msg).unwrap();
349                        buffer.commit().unwrap();
350                    }
351                }
352                info!("stopping publisher thread");
353            })
354        };
355
356        let count = Arc::new(AtomicUsize::new(0usize));
357        let count_copy = Arc::clone(&count);
358        let stop2 = stop.clone();
359
360        struct FragmentHandler {
361            count_copy: Arc<AtomicUsize>,
362            stop2: Arc<AtomicBool>,
363            string_len: usize,
364        }
365
366        impl AeronFragmentHandlerCallback for FragmentHandler {
367            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
368                self.count_copy.fetch_add(1, Ordering::SeqCst);
369
370                if buffer.len() != self.string_len {
371                    self.stop2.store(true, Ordering::SeqCst);
372                    error!(
373                        "ERROR: message was {} but was expecting {} [header={:?}]",
374                        buffer.len(),
375                        self.string_len,
376                        header
377                    );
378                    sleep(Duration::from_secs(1));
379                }
380
381                assert_eq!(buffer.len(), self.string_len);
382                assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
383            }
384        }
385
386        let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
387            count_copy,
388            stop2,
389            string_len,
390        })?;
391        let start_time = Instant::now();
392
393        loop {
394            if start_time.elapsed() > Duration::from_secs(30) {
395                info!("Failed: exceeded 30-second timeout");
396                return Err(Box::new(std::io::Error::new(
397                    std::io::ErrorKind::TimedOut,
398                    "Timeout exceeded",
399                )));
400            }
401            let c = count.load(Ordering::SeqCst);
402            if c > 100 {
403                break;
404            }
405            subscription.poll(Some(&closure), 128)?;
406        }
407
408        info!("stopping client");
409
410        stop.store(true, Ordering::SeqCst);
411
412        let _ = publisher_handler.join().unwrap();
413        let _ = driver_handle.join().unwrap();
414        Ok(())
415    }
416
417    #[test]
418    #[serial]
419    pub fn counters() -> Result<(), Box<dyn error::Error>> {
420        let _ = env_logger::Builder::new()
421            .is_test(true)
422            .filter_level(log::LevelFilter::Info)
423            .try_init();
424        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
425        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
426        media_driver_ctx.set_dir_delete_on_start(true)?;
427        media_driver_ctx.set_dir(
428            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
429        )?;
430        let (stop, driver_handle) =
431            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
432
433        let ctx = AeronContext::new()?;
434        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
435        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
436        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
437        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
438
439        struct AvailableCounterHandler {
440            found_counter: bool,
441        }
442
443        impl AeronAvailableCounterCallback for AvailableCounterHandler {
444            fn handle_aeron_on_available_counter(
445                &mut self,
446                counters_reader: AeronCountersReader,
447                registration_id: i64,
448                counter_id: i32,
449            ) -> () {
450                info!(
451            "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
452            String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
453            counters_reader.get_counter_label(counter_id, 1000),
454            counters_reader.addr(counter_id)
455        );
456
457                assert_eq!(
458                    counters_reader.counter_registration_id(counter_id).unwrap(),
459                    registration_id
460                );
461
462                if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
463                    if label == "label_buffer" {
464                        self.found_counter = true;
465                        assert_eq!(
466                            &counters_reader.get_counter_key(counter_id).unwrap(),
467                            "key".as_bytes()
468                        );
469                    }
470                }
471            }
472        }
473
474        let handler = &Handler::leak(AvailableCounterHandler {
475            found_counter: false,
476        });
477        ctx.set_on_available_counter(Some(handler))?;
478
479        info!("creating client");
480        let aeron = Aeron::new(&ctx)?;
481        info!("starting client");
482
483        aeron.start()?;
484        info!("client started [counters test]");
485
486        let counter = aeron.add_counter(
487            123,
488            "key".as_bytes(),
489            "label_buffer",
490            Duration::from_secs(5),
491        )?;
492        let constants = counter.get_constants()?;
493        let counter_id = constants.counter_id;
494
495        let publisher_handler = {
496            let stop = stop.clone();
497            let counter = counter.clone();
498            std::thread::spawn(move || {
499                for _ in 0..150 {
500                    if stop.load(Ordering::Acquire) || counter.is_closed() {
501                        break;
502                    }
503                    counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
504                }
505                info!("stopping publisher thread");
506            })
507        };
508
509        let now = Instant::now();
510        while counter.addr_atomic().load(Ordering::SeqCst) < 100
511            && now.elapsed() < Duration::from_secs(10)
512        {
513            sleep(Duration::from_micros(10));
514        }
515
516        assert!(now.elapsed() < Duration::from_secs(10));
517
518        info!(
519            "counter is {}",
520            counter.addr_atomic().load(Ordering::SeqCst)
521        );
522
523        info!("stopping client");
524
525        #[cfg(not(target_os = "windows"))] // not sure why windows version doesn't fire event
526        assert!(handler.found_counter);
527
528        stop.store(true, Ordering::SeqCst);
529
530        let reader = aeron.counters_reader();
531        assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
532        assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
533        let buffers = AeronCountersReaderBuffers::default();
534        reader.get_buffers(&buffers)?;
535
536        let _ = publisher_handler.join().unwrap();
537        let _ = driver_handle.join().unwrap();
538        Ok(())
539    }
540
541    /// A simple error counter for testing error callback invocation.
542    #[derive(Default, Debug)]
543    struct TestErrorCount {
544        pub error_count: usize,
545    }
546
547    impl Drop for TestErrorCount {
548        fn drop(&mut self) {
549            info!("TestErrorCount dropped with {} errors", self.error_count);
550        }
551    }
552
553    impl AeronErrorHandlerCallback for TestErrorCount {
554        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
555            error!("Aeron error {}: {}", error_code, msg);
556            self.error_count += 1;
557        }
558    }
559
560    #[test]
561    #[serial]
562    pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
563        let _ = env_logger::Builder::new()
564            .is_test(true)
565            .filter_level(log::LevelFilter::Info)
566            .try_init();
567
568        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
569        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
570        media_driver_ctx.set_dir_delete_on_start(true)?;
571        media_driver_ctx.set_dir(
572            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
573        )?;
574        let (stop, driver_handle) =
575            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
576
577        let ctx = AeronContext::new()?;
578        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
579        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
580
581        let aeron = Aeron::new(&ctx)?;
582        aeron.start()?;
583
584        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
585        let subscription = aeron.add_subscription(
586            AERON_IPC_STREAM,
587            123,
588            Handlers::no_available_image_handler(),
589            Handlers::no_unavailable_image_handler(),
590            Duration::from_secs(5),
591        )?;
592
593        let count = Arc::new(AtomicUsize::new(0));
594        let start_time = Instant::now();
595
596        // Spawn a publisher thread that repeatedly sends "test" messages.
597        let publisher_thread = {
598            let stop = stop.clone();
599            std::thread::spawn(move || {
600                while !stop.load(Ordering::Acquire) {
601                    let msg = b"test";
602                    let result =
603                        publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
604                    // If backpressure is encountered, sleep a bit.
605                    if result == AeronErrorType::PublicationBackPressured.code() as i64 {
606                        sleep(Duration::from_millis(50));
607                    }
608                }
609            })
610        };
611
612        // Poll using the inline closure via poll_once until we receive at least 50 messages.
613        while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
614            let _ = subscription.poll_once(
615                |_msg, _header| {
616                    count.fetch_add(1, Ordering::SeqCst);
617                },
618                128,
619            )?;
620        }
621
622        stop.store(true, Ordering::SeqCst);
623        publisher_thread.join().unwrap();
624        let _ = driver_handle.join().unwrap();
625
626        assert!(
627            count.load(Ordering::SeqCst) >= 50,
628            "Expected at least 50 messages received"
629        );
630        Ok(())
631    }
632
633    #[test]
634    #[serial]
635    pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
636        let _ = env_logger::Builder::new()
637            .is_test(true)
638            .filter_level(log::LevelFilter::Info)
639            .try_init();
640
641        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
642        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
643        media_driver_ctx.set_dir_delete_on_start(true)?;
644        media_driver_ctx.set_dir(
645            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
646        )?;
647        let (_stop, driver_handle) =
648            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
649
650        let ctx = AeronContext::new()?;
651        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
652        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
653
654        let aeron = Aeron::new(&ctx)?;
655        aeron.start()?;
656        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
657
658        // Create two subscriptions on the same channel.
659        let subscription1 = aeron.add_subscription(
660            AERON_IPC_STREAM,
661            123,
662            Handlers::no_available_image_handler(),
663            Handlers::no_unavailable_image_handler(),
664            Duration::from_secs(5),
665        )?;
666        let subscription2 = aeron.add_subscription(
667            AERON_IPC_STREAM,
668            123,
669            Handlers::no_available_image_handler(),
670            Handlers::no_unavailable_image_handler(),
671            Duration::from_secs(5),
672        )?;
673
674        let count1 = Arc::new(AtomicUsize::new(0));
675        let count2 = Arc::new(AtomicUsize::new(0));
676
677        // Publish a single message.
678        let msg = b"hello multi-subscription";
679        let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
680        assert!(
681            result >= msg.len() as i64,
682            "Message should be sent successfully"
683        );
684
685        let start_time = Instant::now();
686        // Poll both subscriptions with inline closures until each has received at least one message.
687        while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
688            && start_time.elapsed() < Duration::from_secs(5)
689        {
690            let _ = subscription1.poll_once(
691                |_msg, _header| {
692                    count1.fetch_add(1, Ordering::SeqCst);
693                },
694                128,
695            )?;
696            let _ = subscription2.poll_once(
697                |_msg, _header| {
698                    count2.fetch_add(1, Ordering::SeqCst);
699                },
700                128,
701            )?;
702        }
703
704        assert!(
705            count1.load(Ordering::SeqCst) >= 1,
706            "Subscription 1 did not receive the message"
707        );
708        assert!(
709            count2.load(Ordering::SeqCst) >= 1,
710            "Subscription 2 did not receive the message"
711        );
712
713        _stop.store(true, Ordering::SeqCst);
714        let _ = driver_handle.join().unwrap();
715        Ok(())
716    }
717
718    #[test]
719    #[serial]
720    pub fn should_be_able_to_drop_after_close_manually_being_closed(
721    ) -> Result<(), Box<dyn error::Error>> {
722        let _ = env_logger::Builder::new()
723            .is_test(true)
724            .filter_level(log::LevelFilter::Info)
725            .try_init();
726
727        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
728        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
729        media_driver_ctx.set_dir_delete_on_start(true)?;
730        media_driver_ctx.set_dir(
731            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
732        )?;
733        let (_stop, driver_handle) =
734            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
735
736        let ctx = AeronContext::new()?;
737        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
738        ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
739
740        let aeron = Aeron::new(&ctx)?;
741        aeron.start()?;
742
743        {
744            let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
745            info!("created publication [sessionId={}]", publisher.session_id());
746            publisher.close_with_no_args()?;
747            drop(publisher);
748        }
749
750        {
751            let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
752            info!("created publication [sessionId={}]", publisher.session_id());
753            publisher.close(Handlers::no_notification_handler())?;
754            drop(publisher);
755        }
756
757        {
758            let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
759            publisher.close_once(|| println!("on close"))?;
760            info!("created publication [sessionId={}]", publisher.session_id());
761            drop(publisher);
762        }
763
764        Ok(())
765    }
766
767    #[test]
768    #[serial]
769    pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
770        let _ = env_logger::Builder::new()
771            .is_test(true)
772            .filter_level(log::LevelFilter::Info)
773            .try_init();
774
775        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
776        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
777        media_driver_ctx.set_dir_delete_on_start(true)?;
778        media_driver_ctx.set_dir(
779            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
780        )?;
781        let (_stop, driver_handle) =
782            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
783
784        let ctx = AeronContext::new()?;
785        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
786        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
787
788        let aeron = Aeron::new(&ctx)?;
789        aeron.start()?;
790        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
791
792        // Close the publication immediately.
793        publisher.close(Handlers::no_notification_handler())?;
794
795        // Attempt to send a message after the publication is closed.
796        let result = publisher.offer(
797            b"should fail",
798            Handlers::no_reserved_value_supplier_handler(),
799        );
800        assert!(
801            result < 0,
802            "Offering on a closed publication should return a negative error code"
803        );
804
805        _stop.store(true, Ordering::SeqCst);
806        let _ = driver_handle.join().unwrap();
807        Ok(())
808    }
809
810    /// Test sending and receiving an empty (zero-length) message using inline closures with poll_once.
811    #[test]
812    #[serial]
813    pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
814        let _ = env_logger::Builder::new()
815            .is_test(true)
816            .filter_level(log::LevelFilter::Info)
817            .try_init();
818
819        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
820        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
821        media_driver_ctx.set_dir_delete_on_start(true)?;
822        media_driver_ctx.set_dir(
823            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
824        )?;
825        let (_stop, driver_handle) =
826            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
827
828        let ctx = AeronContext::new()?;
829        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
830        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
831
832        let aeron = Aeron::new(&ctx)?;
833        aeron.start()?;
834        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
835        let subscription = aeron.add_subscription(
836            AERON_IPC_STREAM,
837            123,
838            Handlers::no_available_image_handler(),
839            Handlers::no_unavailable_image_handler(),
840            Duration::from_secs(5),
841        )?;
842
843        let empty_received = Arc::new(AtomicBool::new(false));
844        let start_time = Instant::now();
845
846        let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
847        assert!(result > 0);
848
849        while !empty_received.load(Ordering::SeqCst)
850            && start_time.elapsed() < Duration::from_secs(5)
851        {
852            let _ = subscription.poll_once(
853                |msg, _header| {
854                    if msg.is_empty() {
855                        empty_received.store(true, Ordering::SeqCst);
856                    }
857                },
858                128,
859            )?;
860        }
861
862        assert!(
863            empty_received.load(Ordering::SeqCst),
864            "Empty message was not received"
865        );
866        _stop.store(true, Ordering::SeqCst);
867        let _ = driver_handle.join().unwrap();
868        Ok(())
869    }
870
871    #[test]
872    #[serial]
873    #[ignore] // need to work to get tags working properly, its more of testing issue then tag issue
874    pub fn tags() -> Result<(), Box<dyn error::Error>> {
875        let _ = env_logger::Builder::new()
876            .is_test(true)
877            .filter_level(log::LevelFilter::Debug)
878            .try_init();
879
880        let (md_ctx, stop, md) = start_media_driver(1)?;
881
882        let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
883
884        info!("creating suscriber 1");
885        let sub = aeron_sub
886            .add_subscription(
887                &"aeron:udp?tags=100".into_c_string(),
888                123,
889                Handlers::no_available_image_handler(),
890                Handlers::no_unavailable_image_handler(),
891                Duration::from_secs(50),
892            )
893            .map_err(|e| {
894                error!("aeron error={}", Aeron::errmsg());
895                e
896            })?;
897
898        let ctx = AeronContext::new()?;
899        ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
900        let aeron = Aeron::new(&ctx)?;
901        aeron.start()?;
902
903        info!("creating suscriber 2");
904        let sub2 = aeron_sub.add_subscription(
905            &"aeron:udp?tags=100".into_c_string(),
906            123,
907            Handlers::no_available_image_handler(),
908            Handlers::no_unavailable_image_handler(),
909            Duration::from_secs(50),
910        )?;
911
912        let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
913        info!("creating publisher");
914        assert!(!aeron_pub.is_closed());
915        let publisher = aeron_pub
916            .add_publication(
917                &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
918                123,
919                Duration::from_secs(5),
920            )
921            .map_err(|e| {
922                error!("aeron error={}", Aeron::errmsg());
923                e
924            })?;
925
926        info!("publishing msg");
927
928        loop {
929            let result = publisher.offer(
930                "213".as_bytes(),
931                Handlers::no_reserved_value_supplier_handler(),
932            );
933            if result < 0 {
934                error!(
935                    "failed to publish {:?}",
936                    AeronCError::from_code(result as i32)
937                );
938            } else {
939                break;
940            }
941        }
942
943        sub.poll_once(
944            |msg, _header| {
945                println!("Received message: {:?}", msg);
946            },
947            128,
948        )?;
949        sub2.poll_once(
950            |msg, _header| {
951                println!("Received message: {:?}", msg);
952            },
953            128,
954        )?;
955
956        stop.store(true, Ordering::SeqCst);
957
958        Ok(())
959    }
960
961    fn create_client(
962        media_driver_ctx: &AeronDriverContext,
963    ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
964        let dir = media_driver_ctx.get_dir();
965        info!("creating aeron client [dir={}]", dir);
966        let ctx = AeronContext::new()?;
967        ctx.set_dir(&dir.into_c_string())?;
968        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
969        let aeron = Aeron::new(&ctx)?;
970        aeron.start()?;
971        Ok((ctx, aeron))
972    }
973
974    fn start_media_driver(
975        instance: u64,
976    ) -> Result<
977        (
978            AeronDriverContext,
979            Arc<AtomicBool>,
980            JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
981        ),
982        Box<dyn Error>,
983    > {
984        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
985        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
986        media_driver_ctx.set_dir_delete_on_start(true)?;
987        media_driver_ctx.set_dir(
988            &format!(
989                "{}{}-{}",
990                media_driver_ctx.get_dir(),
991                Aeron::epoch_clock(),
992                instance
993            )
994            .into_c_string(),
995        )?;
996        let (stop, driver_handle) =
997            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
998        Ok((media_driver_ctx, stop, driver_handle))
999    }
1000
1001    #[doc = include_str!("../../README.md")]
1002    mod readme_tests {}
1003}