1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25
26include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
27include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
28
29#[cfg(test)]
30mod tests {
31 use super::*;
32 use crate::test_alloc::current_allocs;
33 use hdrhistogram::Histogram;
34 use log::{error, info};
35 use rusteron_media_driver::AeronDriverContext;
36 use serial_test::serial;
37 use std::error;
38 use std::error::Error;
39 use std::io::Write;
40 use std::os::raw::c_int;
41 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
42 use std::sync::Arc;
43 use std::thread::{sleep, JoinHandle};
44 use std::time::{Duration, Instant};
45
46 #[derive(Default, Debug)]
47 struct ErrorCount {
48 error_count: usize,
49 }
50
51 impl AeronErrorHandlerCallback for ErrorCount {
52 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
53 error!("Aeron error {}: {}", error_code, msg);
54 self.error_count += 1;
55 }
56 }
57
58 fn running_under_valgrind() -> bool {
59 std::env::var_os("RUSTERON_VALGRIND").is_some()
60 }
61
62 #[test]
63 #[serial]
64 fn version_check() -> Result<(), Box<dyn error::Error>> {
65 unsafe {
66 aeron_randomised_int32();
67 }
68 let alloc_count = current_allocs();
69
70 {
71 let major = unsafe { crate::aeron_version_major() };
72 let minor = unsafe { crate::aeron_version_minor() };
73 let patch = unsafe { crate::aeron_version_patch() };
74
75 let cargo_version = "1.50.2";
76 let aeron_version = format!("{}.{}.{}", major, minor, patch);
77 assert_eq!(aeron_version, cargo_version);
78
79 let ctx = AeronContext::new()?;
80 let error_count = 1;
81 let mut handler = Handler::leak(ErrorCount::default());
82 ctx.set_error_handler(Some(&handler))?;
83
84 assert!(Aeron::epoch_clock() > 0);
85 drop(ctx);
86 assert!(handler.should_drop);
87 handler.release();
88 assert!(!handler.should_drop);
89 drop(handler);
90 }
91
92 assert!(
93 current_allocs() <= alloc_count,
94 "allocations {} > {alloc_count}",
95 current_allocs()
96 );
97
98 Ok(())
99 }
100
101 #[test]
102 #[serial]
103 fn async_publication_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
104 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
105
106 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
107 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
108 media_driver_ctx.set_dir_delete_on_start(true)?;
109 media_driver_ctx.set_dir(
110 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
111 )?;
112 let (stop, driver_handle) =
113 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
114
115 let ctx = AeronContext::new()?;
116 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
117 let mut error_handler = Handler::leak(ErrorCount::default());
118 ctx.set_error_handler(Some(&error_handler))?;
119 let aeron = Aeron::new(&ctx)?;
120 aeron.start()?;
121
122 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54321");
123
124 let pub_poller = aeron.async_add_publication(&channel.clone().into_c_string(), 4321)?;
127 let sub_poller = aeron.async_add_subscription(
128 &channel.into_c_string(),
129 4321,
130 Handlers::no_available_image_handler(),
131 Handlers::no_unavailable_image_handler(),
132 )?;
133
134 let mut publication: Option<AeronPublication> = None;
135 let mut subscription: Option<AeronSubscription> = None;
136 let start = Instant::now();
137 while start.elapsed() < Duration::from_secs(2) {
138 if publication.is_none() {
139 match pub_poller.poll() {
140 Ok(Some(p)) => publication = Some(p),
141 Ok(None) | Err(_) => {}
142 }
143 }
144 if subscription.is_none() {
145 match sub_poller.poll() {
146 Ok(Some(s)) => subscription = Some(s),
147 Ok(None) | Err(_) => {}
148 }
149 }
150 if publication.is_some() && subscription.is_some() {
151 break;
152 }
153 #[cfg(debug_assertions)]
154 std::thread::sleep(Duration::from_millis(10));
155 }
156
157 info!("publication: {:?}", publication);
158 info!("subscription: {:?}", subscription);
159
160 if let (Some(publisher), Some(subscription)) = (publication, subscription) {
161 let payload = b"hello-aeron";
162 let send_start = Instant::now();
163 let mut sent = false;
164 while send_start.elapsed() < Duration::from_millis(500) {
165 let res = publisher.offer(payload, Handlers::no_reserved_value_supplier_handler());
166 if res >= payload.len() as i64 {
167 sent = true;
168 info!("sent {:?}", payload);
169 break;
170 }
171 std::thread::sleep(Duration::from_millis(10));
172 }
173
174 if sent {
175 let mut got = false;
176 let read_start = Instant::now();
177 while read_start.elapsed() < Duration::from_millis(500) {
178 let _ = subscription.poll_once(
179 |msg, _hdr| {
180 if msg == payload {
181 got = true;
182 info!("received {:?}", payload);
183 }
184 },
185 1024,
186 );
187 if got {
188 break;
189 }
190 std::thread::sleep(Duration::from_millis(10));
191 }
192 }
194 }
195
196 drop(aeron);
198 stop.store(true, Ordering::SeqCst);
199 let _ = driver_handle.join().unwrap();
200 error_handler.release();
201 Ok(())
202 }
203
204 #[test]
205 #[serial]
206 fn async_pub_sub_invalid_endpoint_create_drop_stress() -> Result<(), Box<dyn error::Error>> {
207 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
208
209 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
210 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
211 media_driver_ctx.set_dir_delete_on_start(true)?;
212 media_driver_ctx.set_dir(
213 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
214 )?;
215 let (stop, driver_handle) =
216 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
217
218 let ctx = AeronContext::new()?;
219 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
220 let mut error_handler = Handler::leak(ErrorCount::default());
221 ctx.set_error_handler(Some(&error_handler))?;
222 let aeron = Aeron::new(&ctx)?;
223 aeron.start()?;
224
225 const STRESS_ITERS: u16 = 60;
226 const POLL_TIMEOUT: Duration = Duration::from_secs(10);
227 const POLL_SLEEP: Duration = Duration::from_millis(10);
228
229 for i in 0..STRESS_ITERS {
232 let port = 55000u16 + i;
233 let channel = format!("aeron:udp?endpoint=203.0.113.1:{}", port);
234 let pub_poller =
235 aeron.async_add_publication(&channel.clone().into_c_string(), 4500 + i as i32)?;
236 let sub_poller = aeron.async_add_subscription(
237 &channel.into_c_string(),
238 4500 + i as i32,
239 Handlers::no_available_image_handler(),
240 Handlers::no_unavailable_image_handler(),
241 )?;
242
243 let start = Instant::now();
244 let mut publication = None;
245 let mut publication_done = false;
246 let mut subscription = None;
247 let mut subscription_done = false;
248
249 while !(publication_done && subscription_done) && start.elapsed() < POLL_TIMEOUT {
250 if !publication_done {
251 match pub_poller.poll() {
252 Ok(Some(pub_)) => {
253 publication = Some(pub_);
254 publication_done = true;
255 }
256 Ok(None) => {}
257 Err(err) => {
258 info!("publication async add finished with error on iteration {i}: {err:?}");
259 publication_done = true;
260 }
261 }
262 }
263
264 if !subscription_done {
265 match sub_poller.poll() {
266 Ok(Some(sub_)) => {
267 subscription = Some(sub_);
268 subscription_done = true;
269 }
270 Ok(None) => {}
271 Err(err) => {
272 info!("subscription async add finished with error on iteration {i}: {err:?}");
273 subscription_done = true;
274 }
275 }
276 }
277
278 if !(publication_done && subscription_done) {
279 std::thread::sleep(POLL_SLEEP);
280 }
281 }
282
283 assert!(
284 publication_done,
285 "publication async add did not complete on iteration {i} within {:?}",
286 POLL_TIMEOUT
287 );
288 assert!(
289 subscription_done,
290 "subscription async add did not complete on iteration {i} within {:?}",
291 POLL_TIMEOUT
292 );
293
294 drop(subscription);
295 drop(publication);
296 drop(sub_poller);
297 drop(pub_poller);
298 }
299
300 drop(aeron);
301 stop.store(true, Ordering::SeqCst);
302 let _ = driver_handle.join().unwrap();
303 error_handler.release();
304 Ok(())
305 }
306
307 #[test]
308 #[serial]
309 fn async_subscription_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
311 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
312
313 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
314 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
315 media_driver_ctx.set_dir_delete_on_start(true)?;
316 media_driver_ctx.set_dir(
317 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
318 )?;
319 let (stop, driver_handle) =
320 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
321
322 let ctx = AeronContext::new()?;
323 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
324 let mut error_handler = Handler::leak(ErrorCount::default());
325 ctx.set_error_handler(Some(&error_handler))?;
326 let aeron = Aeron::new(&ctx)?;
327 aeron.start()?;
328
329 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54323");
331
332 let poller = aeron.async_add_subscription(
333 &channel.into_c_string(),
334 4323,
335 Handlers::no_available_image_handler(),
336 Handlers::no_unavailable_image_handler(),
337 )?;
338
339 let start = Instant::now();
340 while start.elapsed() < Duration::from_millis(250) {
341 let _ = poller.poll();
342 #[cfg(debug_assertions)]
343 std::thread::sleep(Duration::from_millis(10));
344 }
345
346 drop(poller);
347 drop(aeron);
348 stop.store(true, Ordering::SeqCst);
349 let _ = driver_handle.join().unwrap();
350 error_handler.release();
351 Ok(())
352 }
353
354 #[test]
355 #[serial]
356 fn blocking_add_subscription_invalid_interface_timeout() -> Result<(), Box<dyn error::Error>> {
357 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
358
359 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
360 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
361 media_driver_ctx.set_dir_delete_on_start(true)?;
362 media_driver_ctx.set_dir(
363 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
364 )?;
365 let (stop, driver_handle) =
366 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
367
368 let ctx = AeronContext::new()?;
369 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
370 let mut error_handler = Handler::leak(ErrorCount::default());
371 ctx.set_error_handler(Some(&error_handler))?;
372 let aeron = Aeron::new(&ctx)?;
373 aeron.start()?;
374
375 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54324");
376
377 let result = aeron.add_subscription(
378 &channel.into_c_string(),
379 4324,
380 Handlers::no_available_image_handler(),
381 Handlers::no_unavailable_image_handler(),
382 Duration::from_millis(300),
383 );
384
385 assert!(result.is_err(), "expected error for invalid interface");
386 drop(aeron);
387 stop.store(true, Ordering::SeqCst);
388 let _ = driver_handle.join().unwrap();
389 error_handler.release();
390 Ok(())
391 }
392
393 #[test]
394 #[serial]
395 fn async_publication_invalid_bind_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
396 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
397
398 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
399 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
400 media_driver_ctx.set_dir_delete_on_start(true)?;
401 media_driver_ctx.set_dir(
402 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
403 )?;
404 let (stop, driver_handle) =
405 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
406
407 let ctx = AeronContext::new()?;
408 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
409 let mut error_handler = Handler::leak(ErrorCount::default());
410 ctx.set_error_handler(Some(&error_handler))?;
411 let aeron = Aeron::new(&ctx)?;
412 aeron.start()?;
413
414 let channel = format!("aeron:udp?endpoint=127.0.0.1:54330|bind=203.0.113.1:60000");
416
417 let poller = aeron.async_add_publication(&channel.into_c_string(), 4330)?;
418 let start = Instant::now();
419 while start.elapsed() < Duration::from_millis(250) {
420 let _ = poller.poll();
421 #[cfg(debug_assertions)]
422 std::thread::sleep(Duration::from_millis(10));
423 }
424 drop(poller);
425 drop(aeron);
426 stop.store(true, Ordering::SeqCst);
427 let _ = driver_handle.join().unwrap();
428 error_handler.release();
429 Ok(())
430 }
431
432 #[test]
433 #[serial]
434 pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
435 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
436 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
437 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
438 media_driver_ctx.set_dir_delete_on_start(true)?;
439 media_driver_ctx.set_dir(
440 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
441 )?;
442 media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; media_driver_ctx.set_driver_timeout_ms(60_000)?; let (stop, driver_handle) =
449 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
450
451 let ctx = AeronContext::new()?;
452 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
453 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
454 ctx.set_driver_timeout_ms(60_000)?;
456 let mut error_handler = Handler::leak(ErrorCount::default());
460 let mut new_pub_handler = Handler::leak(AeronNewPublicationLogger);
461 let mut avail_counter_handler1 = Handler::leak(AeronAvailableCounterLogger);
462 let mut close_client_handler = Handler::leak(AeronCloseClientLogger);
463 let mut new_sub_handler = Handler::leak(AeronNewSubscriptionLogger);
464 let mut unavail_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
465 let mut avail_counter_handler2 = Handler::leak(AeronAvailableCounterLogger);
466 let mut excl_pub_handler = Handler::leak(AeronNewPublicationLogger);
467 ctx.set_error_handler(Some(&error_handler))?;
468 ctx.set_on_new_publication(Some(&new_pub_handler))?;
469 ctx.set_on_available_counter(Some(&avail_counter_handler1))?;
470 ctx.set_on_close_client(Some(&close_client_handler))?;
471 ctx.set_on_new_subscription(Some(&new_sub_handler))?;
472 ctx.set_on_unavailable_counter(Some(&unavail_counter_handler))?;
473 ctx.set_on_available_counter(Some(&avail_counter_handler2))?;
474 ctx.set_on_new_exclusive_publication(Some(&excl_pub_handler))?;
475
476 info!("creating client [simple_large_send test]");
477 let aeron = Aeron::new(&ctx)?;
478 info!("starting client");
479
480 aeron.start()?;
481 info!("client started");
482 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
483 info!("created publisher");
484
485 assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
486 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
487 AeronCncMetadata::read_from_file(&cstr, |cnc| {
488 assert!(cnc.pid > 0);
489 })?;
490 assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
491 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
492 for _ in 0..50 {
493 AeronCnc::read_on_partial_stack(&cstr, |cnc| {
494 assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
495 })?;
496 }
497
498 let subscription = aeron.add_subscription(
499 AERON_IPC_STREAM,
500 123,
501 Handlers::no_available_image_handler(),
502 Handlers::no_unavailable_image_handler(),
503 Duration::from_secs(5),
504 )?;
505 info!("created subscription");
506
507 subscription
508 .poll_once(|msg, header| println!("foo"), 1024)
509 .unwrap();
510
511 let string_len = media_driver_ctx.ipc_mtu_length * 100;
513 info!("string length: {}", string_len);
514
515 let stop_publisher = Arc::new(AtomicBool::new(false));
516
517 let publisher_handler = {
518 let stop_publisher = stop_publisher.clone();
519 std::thread::spawn(move || {
520 let binding = "1".repeat(string_len);
521 let large_msg = binding.as_bytes();
522 loop {
523 if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
524 break;
525 }
526 let result =
527 publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
528
529 assert_eq!(123, publisher.get_constants().unwrap().stream_id);
530
531 if result < large_msg.len() as i64 {
532 let error = AeronCError::from_code(result as i32);
533 match error.kind() {
534 AeronErrorType::PublicationBackPressured
535 | AeronErrorType::PublicationAdminAction => {
536 }
538 _ => {
539 error!(
540 "ERROR: failed to send message {:?}",
541 AeronCError::from_code(result as i32)
542 );
543 }
544 }
545 sleep(Duration::from_millis(500));
546 }
547 }
548 info!("stopping publisher thread");
549 })
550 };
551
552 let mut assembler = AeronFragmentClosureAssembler::new()?;
553
554 struct Context {
555 count: Arc<AtomicUsize>,
556 stop: Arc<AtomicBool>,
557 string_len: usize,
558 }
559
560 let count = Arc::new(AtomicUsize::new(0usize));
561 let mut context = Context {
562 count: count.clone(),
563 stop: stop.clone(),
564 string_len,
565 };
566
567 let start_time = Instant::now();
569
570 let loop_result: Result<(), Box<dyn error::Error>> = loop {
573 if start_time.elapsed() > Duration::from_secs(120) {
574 info!("Failed: exceeded 120-second timeout");
575 break Err(Box::new(std::io::Error::new(
576 std::io::ErrorKind::TimedOut,
577 "Timeout exceeded",
578 )));
579 }
580 let c = count.load(Ordering::SeqCst);
581 if c > 100 {
582 break Ok(());
583 }
584
585 fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
586 ctx.count.fetch_add(1, Ordering::SeqCst);
587
588 let values = header.get_values().unwrap();
589 assert_ne!(values.frame.session_id, 0);
590
591 if buffer.len() != ctx.string_len {
592 ctx.stop.store(true, Ordering::SeqCst);
593 error!(
594 "ERROR: message was {} but was expecting {} [header={:?}]",
595 buffer.len(),
596 ctx.string_len,
597 header
598 );
599 sleep(Duration::from_secs(1));
600 }
601
602 assert_eq!(buffer.len(), ctx.string_len);
603 assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
604 }
605
606 subscription.poll(assembler.process(&mut context, process_msg), 128)?;
607 assert_eq!(123, subscription.get_constants().unwrap().stream_id);
608 };
609
610 subscription.close(Handlers::no_notification_handler())?;
611
612 info!("stopping client");
613 stop_publisher.store(true, Ordering::SeqCst);
614
615 let _ = publisher_handler.join().unwrap();
616 drop(subscription);
617 drop(aeron);
618
619 stop.store(true, Ordering::SeqCst);
620 let _ = driver_handle.join().unwrap();
621
622 error_handler.release();
624 new_pub_handler.release();
625 avail_counter_handler1.release();
626 close_client_handler.release();
627 new_sub_handler.release();
628 unavail_counter_handler.release();
629 avail_counter_handler2.release();
630 excl_pub_handler.release();
631
632 let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
633 cnc.counters_reader().foreach_counter_once(
634 |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
635 println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
636 AeronSystemCounterType::try_from(type_id));
637 },
638 );
639 cnc.error_log_read_once(| observation_count: i32,
640 first_observation_timestamp: i64,
641 last_observation_timestamp: i64,
642 error: &str| {
643 println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
644 }, 0);
645 cnc.loss_reporter_read_once(| observation_count: i64,
646 total_bytes_lost: i64,
647 first_observation_timestamp: i64,
648 last_observation_timestamp: i64,
649 session_id: i32,
650 stream_id: i32,
651 channel: &str,
652 source: &str,| {
653 println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
654 })?;
655
656 loop_result?;
657 Ok(())
658 }
659
660 #[test]
661 #[serial]
662 pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
663 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
664 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
665 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
666 media_driver_ctx.set_dir_delete_on_start(true)?;
667 media_driver_ctx.set_dir(
668 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
669 )?;
670 media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; media_driver_ctx.set_driver_timeout_ms(60_000)?; let (stop, driver_handle) =
677 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
678
679 let ctx = AeronContext::new()?;
680 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
681 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
682 ctx.set_driver_timeout_ms(60_000)?;
684 let mut error_handler = Handler::leak(ErrorCount::default());
685 ctx.set_error_handler(Some(&error_handler))?;
686
687 info!("creating client [try_claim test]");
688 let aeron = Aeron::new(&ctx)?;
689 info!("starting client");
690
691 aeron.start()?;
692 info!("client started");
693 const STREAM_ID: i32 = 123;
694 let publisher =
695 aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
696 info!("created publisher");
697
698 let subscription = aeron.add_subscription(
699 AERON_IPC_STREAM,
700 STREAM_ID,
701 Handlers::no_available_image_handler(),
702 Handlers::no_unavailable_image_handler(),
703 Duration::from_secs(5),
704 )?;
705 info!("created subscription");
706
707 let string_len = 156;
709 info!("string length: {}", string_len);
710
711 let stop_publisher = Arc::new(AtomicBool::new(false));
712
713 let publisher_handler = {
714 let stop_publisher = stop_publisher.clone();
715 std::thread::spawn(move || {
716 let binding = "1".repeat(string_len);
717 let msg = binding.as_bytes();
718 let buffer = AeronBufferClaim::default();
719 loop {
720 if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
721 break;
722 }
723
724 let result = publisher.try_claim(string_len, &buffer);
725
726 if result < msg.len() as i64 {
727 error!(
728 "ERROR: failed to send message {:?}",
729 AeronCError::from_code(result as i32)
730 );
731 } else {
732 buffer.data().write_all(&msg).unwrap();
733 buffer.commit().unwrap();
734 }
735 }
736 info!("stopping publisher thread");
737 })
738 };
739
740 let count = Arc::new(AtomicUsize::new(0usize));
741 let count_copy = Arc::clone(&count);
742 let stop2 = stop.clone();
743
744 struct FragmentHandler {
745 count_copy: Arc<AtomicUsize>,
746 stop2: Arc<AtomicBool>,
747 string_len: usize,
748 }
749
750 impl AeronFragmentHandlerCallback for FragmentHandler {
751 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
752 assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
753 let header = header.get_values().unwrap();
754 let frame = header.frame();
755 let stream_id = frame.stream_id();
756 assert_eq!(STREAM_ID, stream_id);
757
758 self.count_copy.fetch_add(1, Ordering::SeqCst);
759
760 if buffer.len() != self.string_len {
761 self.stop2.store(true, Ordering::SeqCst);
762 error!(
763 "ERROR: message was {} but was expecting {} [header={:?}]",
764 buffer.len(),
765 self.string_len,
766 header
767 );
768 sleep(Duration::from_secs(1));
769 }
770
771 assert_eq!(buffer.len(), self.string_len);
772 assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
773 }
774 }
775
776 let (mut closure, mut inner_handler) =
777 Handler::leak_with_fragment_assembler(FragmentHandler {
778 count_copy,
779 stop2,
780 string_len,
781 })?;
782 let loop_result: Result<(), Box<dyn error::Error>> = {
783 let start_time = Instant::now();
784 loop {
785 if start_time.elapsed() > Duration::from_secs(120) {
786 info!("Failed: exceeded 120-second timeout");
787 break Err(Box::new(std::io::Error::new(
788 std::io::ErrorKind::TimedOut,
789 "Timeout exceeded",
790 )));
791 }
792 let c = count.load(Ordering::SeqCst);
793 if c > 100 {
794 break Ok(());
795 }
796 subscription.poll(Some(&closure), 128)?;
797 }
798 };
799
800 info!("stopping client");
801
802 stop_publisher.store(true, Ordering::SeqCst);
803
804 let _ = publisher_handler.join().unwrap();
805 drop(subscription);
806 drop(aeron);
807
808 stop.store(true, Ordering::SeqCst);
809 let _ = driver_handle.join().unwrap();
810 closure.release();
811 inner_handler.release();
812 error_handler.release();
813 loop_result?;
814 Ok(())
815 }
816
817 #[test]
818 #[serial]
819 pub fn counters() -> Result<(), Box<dyn error::Error>> {
820 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
821 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
822 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
823 media_driver_ctx.set_dir_delete_on_start(true)?;
824 media_driver_ctx.set_dir(
825 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
826 )?;
827 let (stop, driver_handle) =
828 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
829
830 let ctx = AeronContext::new()?;
831 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
832 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
833 let mut error_handler = Handler::leak(ErrorCount::default());
834 ctx.set_error_handler(Some(&error_handler))?;
835 let mut unavailable_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
836 ctx.set_on_unavailable_counter(Some(&unavailable_counter_handler))?;
837
838 struct AvailableCounterHandler {
839 found_counter: bool,
840 }
841
842 impl AeronAvailableCounterCallback for AvailableCounterHandler {
843 fn handle_aeron_on_available_counter(
844 &mut self,
845 counters_reader: AeronCountersReader,
846 registration_id: i64,
847 counter_id: i32,
848 ) -> () {
849 info!(
850 "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
851 String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
852 counters_reader.get_counter_label(counter_id, 1000),
853 counters_reader.addr(counter_id)
854 );
855
856 assert_eq!(
857 counters_reader.counter_registration_id(counter_id).unwrap(),
858 registration_id
859 );
860
861 if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
862 if label == "label_buffer" {
863 self.found_counter = true;
864 assert_eq!(
865 &counters_reader.get_counter_key(counter_id).unwrap(),
866 "key".as_bytes()
867 );
868 }
869 }
870 }
871 }
872
873 let mut available_counter_handler = Handler::leak(AvailableCounterHandler {
874 found_counter: false,
875 });
876 ctx.set_on_available_counter(Some(&available_counter_handler))?;
877
878 info!("creating client");
879 let aeron = Aeron::new(&ctx)?;
880 info!("starting client");
881
882 aeron.start()?;
883 info!("client started [counters test]");
884
885 let counter = aeron.add_counter(
886 123,
887 "key".as_bytes(),
888 "label_buffer",
889 Duration::from_secs(5),
890 )?;
891 let constants = counter.get_constants()?;
892 let counter_id = constants.counter_id;
893
894 let stop_publisher = Arc::new(AtomicBool::new(false));
895
896 let publisher_handler = {
897 let stop_publisher = stop_publisher.clone();
898 let counter = counter.clone();
899 std::thread::spawn(move || {
900 for _ in 0..150 {
901 if stop_publisher.load(Ordering::Acquire) || counter.is_closed() {
902 break;
903 }
904 counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
905 }
906 info!("stopping publisher thread");
907 })
908 };
909
910 let now = Instant::now();
911 while counter.addr_atomic().load(Ordering::SeqCst) < 100
912 && now.elapsed() < Duration::from_secs(10)
913 {
914 sleep(Duration::from_micros(10));
915 }
916
917 assert!(now.elapsed() < Duration::from_secs(10));
918
919 info!(
920 "counter is {}",
921 counter.addr_atomic().load(Ordering::SeqCst)
922 );
923
924 info!("stopping client");
925
926 #[cfg(not(target_os = "windows"))] assert!(available_counter_handler.found_counter);
928
929 let reader = aeron.counters_reader();
930 assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
931 assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
932 let buffers = AeronCountersReaderBuffers::default();
933 reader.get_buffers(&buffers)?;
934
935 stop_publisher.store(true, Ordering::SeqCst);
936
937 let _ = publisher_handler.join().unwrap();
938 drop(counter);
939 drop(aeron);
940
941 stop.store(true, Ordering::SeqCst);
942 let _ = driver_handle.join().unwrap();
943 available_counter_handler.release();
944 unavailable_counter_handler.release();
945 error_handler.release();
946 Ok(())
947 }
948
949 #[derive(Default, Debug)]
951 struct TestErrorCount {
952 pub error_count: usize,
953 }
954
955 impl Drop for TestErrorCount {
956 fn drop(&mut self) {
957 info!("TestErrorCount dropped with {} errors", self.error_count);
958 }
959 }
960
961 impl AeronErrorHandlerCallback for TestErrorCount {
962 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
963 error!("Aeron error {}: {}", error_code, msg);
964 self.error_count += 1;
965 }
966 }
967
968 #[test]
969 #[serial]
970 pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
971 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
972
973 let under_valgrind = running_under_valgrind();
974 let driver_timeout_ms = if under_valgrind { 180_000 } else { 60_000 };
975 let liveness_timeout_ns = if under_valgrind {
976 180_000_000_000
977 } else {
978 60_000_000_000
979 };
980 let poll_timeout = Duration::from_millis(driver_timeout_ms as u64);
981
982 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
983 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
984 media_driver_ctx.set_dir_delete_on_start(true)?;
985 media_driver_ctx.set_dir(
986 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
987 )?;
988 media_driver_ctx.set_client_liveness_timeout_ns(liveness_timeout_ns)?;
989 media_driver_ctx.set_image_liveness_timeout_ns(liveness_timeout_ns)?;
990 media_driver_ctx.set_publication_unblock_timeout_ns(liveness_timeout_ns + 5_000_000_000)?;
991 media_driver_ctx.set_driver_timeout_ms(driver_timeout_ms)?;
992 let (stop, driver_handle) =
993 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
994
995 let ctx = AeronContext::new()?;
996 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
997 ctx.set_driver_timeout_ms(driver_timeout_ms)?;
998 let mut error_handler = Handler::leak(TestErrorCount::default());
999 ctx.set_error_handler(Some(&error_handler))?;
1000
1001 let aeron = Aeron::new(&ctx)?;
1002 aeron.start()?;
1003
1004 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1005 let subscription = aeron.add_subscription(
1006 AERON_IPC_STREAM,
1007 123,
1008 Handlers::no_available_image_handler(),
1009 Handlers::no_unavailable_image_handler(),
1010 Duration::from_secs(5),
1011 )?;
1012
1013 let count = Arc::new(AtomicUsize::new(0));
1014 let start_time = Instant::now();
1015
1016 let stop_publisher = Arc::new(AtomicBool::new(false));
1017
1018 let publisher_thread = {
1020 let stop_publisher = stop_publisher.clone();
1021 std::thread::spawn(move || {
1022 while !stop_publisher.load(Ordering::Acquire) {
1023 let msg = b"test";
1024 let result =
1025 publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1026 if result == AeronErrorType::PublicationBackPressured.code() as i64 {
1028 sleep(Duration::from_millis(50));
1029 }
1030 if publisher.is_closed() {
1031 break;
1032 }
1033 }
1034 })
1035 };
1036
1037 while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < poll_timeout {
1039 let _ = subscription.poll_once(
1040 |_msg, _header| {
1041 count.fetch_add(1, Ordering::SeqCst);
1042 },
1043 128,
1044 )?;
1045 }
1046
1047 stop_publisher.store(true, Ordering::SeqCst);
1048 publisher_thread.join().unwrap();
1049 drop(subscription);
1050 drop(aeron);
1051 stop.store(true, Ordering::SeqCst);
1052 let _ = driver_handle.join().unwrap();
1053 error_handler.release();
1054
1055 assert!(
1056 count.load(Ordering::SeqCst) >= 50,
1057 "Expected at least 50 messages received"
1058 );
1059 Ok(())
1060 }
1061
1062 #[test]
1063 #[serial]
1064 pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
1065 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1066
1067 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1068 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1069 media_driver_ctx.set_dir_delete_on_start(true)?;
1070 media_driver_ctx.set_dir(
1071 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1072 )?;
1073 media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?;
1074 media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?;
1075 media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?;
1076 media_driver_ctx.set_driver_timeout_ms(60_000)?;
1077 let (_stop, driver_handle) =
1078 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1079
1080 let ctx = AeronContext::new()?;
1081 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1082 let mut error_handler = Handler::leak(TestErrorCount::default());
1083 ctx.set_error_handler(Some(&error_handler))?;
1084
1085 let aeron = Aeron::new(&ctx)?;
1086 aeron.start()?;
1087 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1088
1089 let subscription1 = aeron.add_subscription(
1091 AERON_IPC_STREAM,
1092 123,
1093 Handlers::no_available_image_handler(),
1094 Handlers::no_unavailable_image_handler(),
1095 Duration::from_secs(5),
1096 )?;
1097 let subscription2 = aeron.add_subscription(
1098 AERON_IPC_STREAM,
1099 123,
1100 Handlers::no_available_image_handler(),
1101 Handlers::no_unavailable_image_handler(),
1102 Duration::from_secs(5),
1103 )?;
1104
1105 let count1 = Arc::new(AtomicUsize::new(0));
1106 let count2 = Arc::new(AtomicUsize::new(0));
1107
1108 let msg = b"hello multi-subscription";
1110 let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1111 assert!(
1112 result >= msg.len() as i64,
1113 "Message should be sent successfully"
1114 );
1115
1116 let start_time = Instant::now();
1117 while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
1119 && start_time.elapsed() < Duration::from_secs(5)
1120 {
1121 let _ = subscription1.poll_once(
1122 |_msg, _header| {
1123 count1.fetch_add(1, Ordering::SeqCst);
1124 },
1125 128,
1126 )?;
1127 let _ = subscription2.poll_once(
1128 |_msg, _header| {
1129 count2.fetch_add(1, Ordering::SeqCst);
1130 },
1131 128,
1132 )?;
1133 }
1134
1135 assert!(
1136 count1.load(Ordering::SeqCst) >= 1,
1137 "Subscription 1 did not receive the message"
1138 );
1139 assert!(
1140 count2.load(Ordering::SeqCst) >= 1,
1141 "Subscription 2 did not receive the message"
1142 );
1143
1144 drop(subscription2);
1145 drop(subscription1);
1146 drop(publisher);
1147 drop(aeron);
1148 _stop.store(true, Ordering::SeqCst);
1149 let _ = driver_handle.join().unwrap();
1150 error_handler.release();
1151 Ok(())
1152 }
1153
1154 #[test]
1155 #[serial]
1156 pub fn should_be_able_to_drop_after_close_manually_being_closed(
1157 ) -> Result<(), Box<dyn error::Error>> {
1158 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1159
1160 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1161 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1162 media_driver_ctx.set_dir_delete_on_start(true)?;
1163 media_driver_ctx.set_dir(
1164 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1165 )?;
1166 let (_stop, driver_handle) =
1167 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1168
1169 let ctx = AeronContext::new()?;
1170 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1171 let mut error_handler = Handler::leak(AeronErrorHandlerLogger);
1172 ctx.set_error_handler(Some(&error_handler))?;
1173
1174 let aeron = Aeron::new(&ctx)?;
1175 aeron.start()?;
1176
1177 {
1178 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1179 info!("created publication [sessionId={}]", publisher.session_id());
1180 publisher.close_with_no_args()?;
1181 drop(publisher);
1182 }
1183
1184 {
1185 let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
1186 info!("created publication [sessionId={}]", publisher.session_id());
1187 publisher.close(Handlers::no_notification_handler())?;
1188 drop(publisher);
1189 }
1190
1191 {
1192 let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
1193 publisher.close_once(|| println!("on close"))?;
1194 info!("created publication [sessionId={}]", publisher.session_id());
1195 drop(publisher);
1196 }
1197
1198 drop(aeron);
1199 _stop.store(true, Ordering::SeqCst);
1200 let _ = driver_handle.join().unwrap();
1201 error_handler.release();
1202 Ok(())
1203 }
1204
1205 #[test]
1206 #[serial]
1207 pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
1208 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1209
1210 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1211 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1212 media_driver_ctx.set_dir_delete_on_start(true)?;
1213 media_driver_ctx.set_dir(
1214 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1215 )?;
1216 let (_stop, driver_handle) =
1217 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1218
1219 let ctx = AeronContext::new()?;
1220 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1221 let mut error_handler = Handler::leak(TestErrorCount::default());
1222 ctx.set_error_handler(Some(&error_handler))?;
1223
1224 let aeron = Aeron::new(&ctx)?;
1225 aeron.start()?;
1226 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1227
1228 publisher.close(Handlers::no_notification_handler())?;
1230
1231 let result = publisher.offer(
1233 b"should fail",
1234 Handlers::no_reserved_value_supplier_handler(),
1235 );
1236 assert!(
1237 result < 0,
1238 "Offering on a closed publication should return a negative error code"
1239 );
1240
1241 drop(publisher);
1242 drop(aeron);
1243 _stop.store(true, Ordering::SeqCst);
1244 let _ = driver_handle.join().unwrap();
1245 error_handler.release();
1246 Ok(())
1247 }
1248
1249 #[test]
1251 #[serial]
1252 pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
1253 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1254
1255 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1256 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1257 media_driver_ctx.set_dir_delete_on_start(true)?;
1258 media_driver_ctx.set_dir(
1259 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1260 )?;
1261 let (_stop, driver_handle) =
1262 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1263
1264 let ctx = AeronContext::new()?;
1265 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1266 let mut error_handler = Handler::leak(TestErrorCount::default());
1267 ctx.set_error_handler(Some(&error_handler))?;
1268
1269 let aeron = Aeron::new(&ctx)?;
1270 aeron.start()?;
1271 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1272 let subscription = aeron.add_subscription(
1273 AERON_IPC_STREAM,
1274 123,
1275 Handlers::no_available_image_handler(),
1276 Handlers::no_unavailable_image_handler(),
1277 Duration::from_secs(5),
1278 )?;
1279
1280 let empty_received = Arc::new(AtomicBool::new(false));
1281 let start_time = Instant::now();
1282
1283 let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
1284 assert!(result > 0);
1285
1286 while !empty_received.load(Ordering::SeqCst)
1287 && start_time.elapsed() < Duration::from_secs(5)
1288 {
1289 let _ = subscription.poll_once(
1290 |msg, _header| {
1291 if msg.is_empty() {
1292 empty_received.store(true, Ordering::SeqCst);
1293 }
1294 },
1295 128,
1296 )?;
1297 }
1298
1299 assert!(
1300 empty_received.load(Ordering::SeqCst),
1301 "Empty message was not received"
1302 );
1303 drop(subscription);
1304 drop(publisher);
1305 drop(aeron);
1306 _stop.store(true, Ordering::SeqCst);
1307 let _ = driver_handle.join().unwrap();
1308 error_handler.release();
1309 Ok(())
1310 }
1311
1312 #[derive(Default, Debug)]
1313 struct MdcTotals {
1314 gap_events: u64,
1315 missing_messages: u64,
1316 received_messages: u64,
1317 }
1318
1319 #[derive(Debug)]
1320 struct MdcWindowStats {
1321 expected_seq: Option<u64>,
1322 gap_events: u64,
1323 missing_messages: u64,
1324 received_messages: u64,
1325 histogram: Histogram<u64>,
1326 }
1327
1328 impl MdcWindowStats {
1329 fn new() -> Result<Self, Box<dyn error::Error>> {
1330 Ok(Self {
1331 expected_seq: None,
1332 gap_events: 0,
1333 missing_messages: 0,
1334 received_messages: 0,
1335 histogram: Histogram::new(3)?,
1336 })
1337 }
1338
1339 fn observe(&mut self, seq: u64, sent_ts_ns: u64) {
1340 self.received_messages += 1;
1341
1342 match self.expected_seq {
1343 None => self.expected_seq = Some(seq.saturating_add(1)),
1344 Some(expected) if seq > expected => {
1345 self.gap_events += 1;
1346 self.missing_messages += seq - expected;
1347 self.expected_seq = Some(seq.saturating_add(1));
1348 }
1349 Some(expected) if seq == expected => {
1350 self.expected_seq = Some(expected.saturating_add(1));
1351 }
1352 Some(_) => {
1353 }
1355 }
1356
1357 let now_ns = Aeron::nano_clock().max(0) as u64;
1358 let latency_ns = now_ns.saturating_sub(sent_ts_ns);
1359 let _ = self.histogram.record(latency_ns);
1360 }
1361
1362 fn print_and_reset(
1363 &mut self,
1364 window_number: usize,
1365 interval: Duration,
1366 totals: &mut MdcTotals,
1367 ) {
1368 totals.gap_events += self.gap_events;
1369 totals.missing_messages += self.missing_messages;
1370 totals.received_messages += self.received_messages;
1371
1372 if self.histogram.len() > 0 {
1373 let min_us = self.histogram.min() / 1_000;
1374 let p50_us = self.histogram.value_at_quantile(0.50) / 1_000;
1375 let p99_us = self.histogram.value_at_quantile(0.99) / 1_000;
1376 let max_us = self.histogram.max() / 1_000;
1377 println!(
1378 "[mdc-window-{window_number}] interval={interval:?} received={} gaps={} missing={} latency_us[min={}, p50={}, p99={}, max={}]",
1379 self.received_messages, self.gap_events, self.missing_messages, min_us, p50_us, p99_us, max_us,
1380 );
1381 } else {
1382 println!(
1383 "[mdc-window-{window_number}] interval={interval:?} received=0 gaps=0 missing=0 latency_us[min=n/a, p50=n/a, p99=n/a, max=n/a]"
1384 );
1385 }
1386
1387 self.expected_seq = None;
1388 self.gap_events = 0;
1389 self.missing_messages = 0;
1390 self.received_messages = 0;
1391 self.histogram.reset();
1392 }
1393 }
1394
1395 #[test]
1405 #[serial]
1406 #[ignore] pub fn mdc_unreliable_gap_latency_histogram_report() -> Result<(), Box<dyn error::Error>> {
1408 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1409
1410 const STREAM_ID: i32 = 32931;
1411 const CONTROL_PORT: u16 = 32929;
1412 const SUBSCRIBER_PORT: u16 = 32930;
1413 const MESSAGE_LEN: usize = 130;
1414
1415 let report_interval = Duration::from_secs(
1416 std::env::var("RUSTERON_MDC_REPORT_INTERVAL_SECS")
1417 .ok()
1418 .and_then(|value| value.parse::<u64>().ok())
1419 .unwrap_or(10),
1420 );
1421 let test_duration = Duration::from_secs(
1422 std::env::var("RUSTERON_MDC_TEST_DURATION_SECS")
1423 .ok()
1424 .and_then(|value| value.parse::<u64>().ok())
1425 .unwrap_or(300),
1426 );
1427 let mdc_host = std::env::var("RUSTERON_MDC_HOST").unwrap_or("127.0.0.1".to_string());
1428
1429 let publication_channel = format!(
1430 "aeron:udp?control-mode=manual|control={}:{CONTROL_PORT}",
1431 mdc_host
1432 );
1433 let subscription_channel = format!(
1436 "aeron:udp?endpoint={}:{SUBSCRIBER_PORT}|reliable=false|tether=false|group=false|nak-delay=500us",
1437 mdc_host
1438 );
1439 let destination_uri = format!("aeron:udp?endpoint={}:{SUBSCRIBER_PORT}", mdc_host);
1440
1441 let (media_driver_ctx, stop_driver, driver_handle) = start_media_driver(32930)?;
1442 let aeron_dir = media_driver_ctx.get_dir().to_string();
1443
1444 println!(
1445 "[mdc-start] publication={} subscription={} duration={:?} report_interval={:?}",
1446 publication_channel, subscription_channel, test_duration, report_interval
1447 );
1448
1449 let running = Arc::new(AtomicBool::new(true));
1450
1451 let subscriber_dir = aeron_dir.clone();
1452 let subscriber_channel = subscription_channel.clone();
1453 let subscriber_running = Arc::clone(&running);
1454 let subscriber_thread = std::thread::spawn(move || -> MdcTotals {
1455 let (_ctx, aeron) = create_client_for_dir(&subscriber_dir)
1456 .expect("failed to create subscriber aeron client");
1457
1458 let subscription = aeron
1459 .add_subscription(
1460 &subscriber_channel.into_c_string(),
1461 STREAM_ID,
1462 Handlers::no_available_image_handler(),
1463 Handlers::no_unavailable_image_handler(),
1464 Duration::from_secs(5),
1465 )
1466 .expect("failed to create subscriber");
1467
1468 let mut totals = MdcTotals::default();
1469 let mut window_stats = MdcWindowStats::new().expect("failed to create histogram");
1470 let test_start = Instant::now();
1471 let mut window_start = test_start;
1472 let mut window_number = 1usize;
1473
1474 while test_start.elapsed() < test_duration {
1475 let _ = subscription
1476 .poll_once(
1477 |msg, _header| {
1478 if msg.len() < 16 {
1479 return;
1480 }
1481
1482 let seq = u64::from_le_bytes(msg[0..8].try_into().unwrap());
1483 let sent_ts_ns = u64::from_le_bytes(msg[8..16].try_into().unwrap());
1484 window_stats.observe(seq, sent_ts_ns);
1485 },
1486 10_000,
1487 )
1488 .expect("subscriber poll failed");
1489
1490 if window_start.elapsed() >= report_interval {
1491 window_stats.print_and_reset(window_number, report_interval, &mut totals);
1492 window_number += 1;
1493 window_start = Instant::now();
1494 }
1495 }
1496
1497 window_stats.print_and_reset(window_number, report_interval, &mut totals);
1498 subscriber_running.store(false, Ordering::SeqCst);
1499 totals
1500 });
1501
1502 sleep(Duration::from_millis(250));
1504
1505 let publisher_dir = aeron_dir;
1506 let publisher_channel = publication_channel.clone();
1507 let publisher_destination = destination_uri.clone();
1508 let publisher_running = Arc::clone(&running);
1509 let publisher_thread = std::thread::spawn(move || -> u64 {
1510 let (_ctx, aeron) = create_client_for_dir(&publisher_dir)
1511 .expect("failed to create publisher aeron client");
1512
1513 let publication = aeron
1514 .add_exclusive_publication(
1515 &publisher_channel.into_c_string(),
1516 STREAM_ID,
1517 Duration::from_secs(5),
1518 )
1519 .expect("failed to create publication");
1520
1521 let add_destination =
1522 AeronAsyncDestination::aeron_exclusive_publication_async_add_destination(
1523 &aeron,
1524 &publication,
1525 &publisher_destination.into_c_string(),
1526 )
1527 .expect("failed to add manual MDC destination");
1528
1529 let add_destination_start = Instant::now();
1530 while add_destination
1531 .aeron_exclusive_publication_async_destination_poll()
1532 .expect("destination add poll failed")
1533 == 0
1534 {
1535 assert!(
1536 add_destination_start.elapsed() <= Duration::from_secs(5),
1537 "Timed out adding manual MDC destination"
1538 );
1539 sleep(Duration::from_millis(10));
1540 }
1541
1542 let connect_start = Instant::now();
1543 while !publication.is_connected() && connect_start.elapsed() < Duration::from_secs(5) {
1544 sleep(Duration::from_millis(10));
1545 }
1546 assert!(
1547 publication.is_connected(),
1548 "manual MDC publication did not connect to subscriber destination"
1549 );
1550
1551 let mut seq: u64 = 0;
1552 let mut payload = [0u8; MESSAGE_LEN];
1553 while publisher_running.load(Ordering::Acquire) {
1554 payload[0..8].copy_from_slice(&seq.to_le_bytes());
1555 let ts_ns = Aeron::nano_clock().max(0) as u64;
1556 payload[8..16].copy_from_slice(&ts_ns.to_le_bytes());
1557
1558 let result =
1559 publication.offer(&payload, Handlers::no_reserved_value_supplier_handler());
1560 if result > 0 {
1561 seq = seq.wrapping_add(1);
1562 }
1563 sleep(Duration::from_millis(1));
1564 }
1565 seq
1566 });
1567
1568 let totals = subscriber_thread.join().unwrap();
1569 running.store(false, Ordering::SeqCst);
1570 let sent_messages = publisher_thread.join().unwrap();
1571 stop_driver.store(true, Ordering::SeqCst);
1572 let _ = driver_handle.join().unwrap();
1573
1574 println!(
1575 "[mdc-summary] sent={} received={} total_gaps={} total_missing={}",
1576 sent_messages, totals.received_messages, totals.gap_events, totals.missing_messages
1577 );
1578
1579 assert!(sent_messages > 0, "publisher failed to send any messages");
1580 assert!(
1581 totals.received_messages > 0,
1582 "subscriber did not receive any messages"
1583 );
1584 Ok(())
1585 }
1586
1587 #[test]
1588 #[serial]
1589 #[ignore] pub fn tags() -> Result<(), Box<dyn error::Error>> {
1591 rusteron_code_gen::test_logger::init(log::LevelFilter::Debug);
1592
1593 let (md_ctx, stop, md) = start_media_driver(1)?;
1594
1595 let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
1596
1597 info!("creating suscriber 1");
1598 let sub = aeron_sub
1599 .add_subscription(
1600 &"aeron:udp?tags=100".into_c_string(),
1601 123,
1602 Handlers::no_available_image_handler(),
1603 Handlers::no_unavailable_image_handler(),
1604 Duration::from_secs(50),
1605 )
1606 .map_err(|e| {
1607 error!("aeron error={}", Aeron::errmsg());
1608 e
1609 })?;
1610
1611 let ctx = AeronContext::new()?;
1612 ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
1613 let aeron = Aeron::new(&ctx)?;
1614 aeron.start()?;
1615
1616 info!("creating suscriber 2");
1617 let sub2 = aeron_sub.add_subscription(
1618 &"aeron:udp?tags=100".into_c_string(),
1619 123,
1620 Handlers::no_available_image_handler(),
1621 Handlers::no_unavailable_image_handler(),
1622 Duration::from_secs(50),
1623 )?;
1624
1625 let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
1626 info!("creating publisher");
1627 assert!(!aeron_pub.is_closed());
1628 let publisher = aeron_pub
1629 .add_publication(
1630 &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
1631 123,
1632 Duration::from_secs(5),
1633 )
1634 .map_err(|e| {
1635 error!("aeron error={}", Aeron::errmsg());
1636 e
1637 })?;
1638
1639 info!("publishing msg");
1640
1641 loop {
1642 let result = publisher.offer(
1643 "213".as_bytes(),
1644 Handlers::no_reserved_value_supplier_handler(),
1645 );
1646 if result < 0 {
1647 error!(
1648 "failed to publish {:?}",
1649 AeronCError::from_code(result as i32)
1650 );
1651 } else {
1652 break;
1653 }
1654 }
1655
1656 sub.poll_once(
1657 |msg, _header| {
1658 println!("Received message: {:?}", msg);
1659 },
1660 128,
1661 )?;
1662 sub2.poll_once(
1663 |msg, _header| {
1664 println!("Received message: {:?}", msg);
1665 },
1666 128,
1667 )?;
1668
1669 stop.store(true, Ordering::SeqCst);
1670
1671 Ok(())
1672 }
1673
1674 fn create_client_for_dir(dir: &str) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1675 info!("creating aeron client [dir={}]", dir);
1676 let ctx = AeronContext::new()?;
1677 ctx.set_dir(&dir.into_c_string())?;
1678 let aeron = Aeron::new(&ctx)?;
1679 aeron.start()?;
1680 Ok((ctx, aeron))
1681 }
1682
1683 fn create_client(
1684 media_driver_ctx: &AeronDriverContext,
1685 ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1686 let dir = media_driver_ctx.get_dir().to_string();
1687 create_client_for_dir(&dir)
1688 }
1689
1690 fn start_media_driver(
1691 instance: u64,
1692 ) -> Result<
1693 (
1694 AeronDriverContext,
1695 Arc<AtomicBool>,
1696 JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
1697 ),
1698 Box<dyn Error>,
1699 > {
1700 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1701 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1702 media_driver_ctx.set_dir_delete_on_start(true)?;
1703 media_driver_ctx.set_dir(
1704 &format!(
1705 "{}{}-{}",
1706 media_driver_ctx.get_dir(),
1707 Aeron::epoch_clock(),
1708 instance
1709 )
1710 .into_c_string(),
1711 )?;
1712 let (stop, driver_handle) =
1713 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1714 Ok((media_driver_ctx, stop, driver_handle))
1715 }
1716
1717 #[doc = include_str!("../../README.md")]
1718 mod readme_tests {}
1719}