1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25
26include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
27include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
28
29#[cfg(test)]
30mod tests {
31 use super::*;
32 use crate::test_alloc::current_allocs;
33 use hdrhistogram::Histogram;
34 use log::{error, info};
35 use rusteron_media_driver::AeronDriverContext;
36 use serial_test::serial;
37 use std::error;
38 use std::error::Error;
39 use std::io::Write;
40 use std::os::raw::c_int;
41 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
42 use std::sync::Arc;
43 use std::thread::{sleep, JoinHandle};
44 use std::time::{Duration, Instant};
45
46 #[derive(Default, Debug)]
47 struct ErrorCount {
48 error_count: usize,
49 }
50
51 impl AeronErrorHandlerCallback for ErrorCount {
52 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
53 error!("Aeron error {}: {}", error_code, msg);
54 self.error_count += 1;
55 }
56 }
57
58 fn running_under_valgrind() -> bool {
59 std::env::var_os("RUSTERON_VALGRIND").is_some()
60 }
61
62 #[test]
63 #[serial]
64 fn version_check() -> Result<(), Box<dyn error::Error>> {
65 unsafe {
66 aeron_randomised_int32();
67 }
68 let alloc_count = current_allocs();
69
70 {
71 let major = unsafe { crate::aeron_version_major() };
72 let minor = unsafe { crate::aeron_version_minor() };
73 let patch = unsafe { crate::aeron_version_patch() };
74
75 let cargo_version = "1.51.0";
76 let aeron_version = format!("{}.{}.{}", major, minor, patch);
77 assert_eq!(aeron_version, cargo_version);
78
79 let ctx = AeronContext::new()?;
80 let error_count = 1;
81 let mut handler = Handler::leak(ErrorCount::default());
82 ctx.set_error_handler(Some(&handler))?;
83
84 assert!(Aeron::epoch_clock() > 0);
85 drop(ctx);
86 assert!(handler.should_drop);
87 handler.release();
88 assert!(!handler.should_drop);
89 drop(handler);
90 }
91
92 assert!(
93 current_allocs() <= alloc_count,
94 "allocations {} > {alloc_count}",
95 current_allocs()
96 );
97
98 Ok(())
99 }
100
101 #[test]
102 #[serial]
103 fn async_publication_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
104 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
105
106 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
107 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
108 media_driver_ctx.set_dir_delete_on_start(true)?;
109 media_driver_ctx.set_dir(
110 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
111 )?;
112 let (stop, driver_handle) =
113 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
114
115 let ctx = AeronContext::new()?;
116 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
117 let mut error_handler = Handler::leak(ErrorCount::default());
118 ctx.set_error_handler(Some(&error_handler))?;
119 let aeron = Aeron::new(&ctx)?;
120 aeron.start()?;
121
122 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54321");
123
124 let pub_poller = aeron.async_add_publication(&channel.clone().into_c_string(), 4321)?;
127 let sub_poller = aeron.async_add_subscription(
128 &channel.into_c_string(),
129 4321,
130 Handlers::no_available_image_handler(),
131 Handlers::no_unavailable_image_handler(),
132 )?;
133
134 let mut publication: Option<AeronPublication> = None;
135 let mut subscription: Option<AeronSubscription> = None;
136 let start = Instant::now();
137 while start.elapsed() < Duration::from_secs(2) {
138 if publication.is_none() {
139 match pub_poller.poll() {
140 Ok(Some(p)) => publication = Some(p),
141 Ok(None) | Err(_) => {}
142 }
143 }
144 if subscription.is_none() {
145 match sub_poller.poll() {
146 Ok(Some(s)) => subscription = Some(s),
147 Ok(None) | Err(_) => {}
148 }
149 }
150 if publication.is_some() && subscription.is_some() {
151 break;
152 }
153 #[cfg(debug_assertions)]
154 std::thread::sleep(Duration::from_millis(10));
155 }
156
157 info!("publication: {:?}", publication);
158 info!("subscription: {:?}", subscription);
159
160 if let (Some(publisher), Some(subscription)) = (publication, subscription) {
161 let payload = b"hello-aeron";
162 let send_start = Instant::now();
163 let mut sent = false;
164 while send_start.elapsed() < Duration::from_millis(500) {
165 let res = publisher.offer(payload, Handlers::no_reserved_value_supplier_handler());
166 if res >= payload.len() as i64 {
167 sent = true;
168 info!("sent {:?}", payload);
169 break;
170 }
171 std::thread::sleep(Duration::from_millis(10));
172 }
173
174 if sent {
175 let mut got = false;
176 let read_start = Instant::now();
177 while read_start.elapsed() < Duration::from_millis(500) {
178 let _ = subscription.poll_once(
179 |msg, _hdr| {
180 if msg == payload {
181 got = true;
182 info!("received {:?}", payload);
183 }
184 },
185 1024,
186 );
187 if got {
188 break;
189 }
190 std::thread::sleep(Duration::from_millis(10));
191 }
192 }
194 }
195
196 drop(sub_poller);
198 drop(pub_poller);
199 drop(aeron);
200 stop.store(true, Ordering::SeqCst);
201 let _ = driver_handle.join().unwrap();
202 error_handler.release();
203 Ok(())
204 }
205
206 #[test]
207 #[serial]
208 fn async_pub_sub_invalid_endpoint_create_drop_stress() -> Result<(), Box<dyn error::Error>> {
209 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
210
211 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
212 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
213 media_driver_ctx.set_dir_delete_on_start(true)?;
214 media_driver_ctx.set_dir(
215 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
216 )?;
217 let (stop, driver_handle) =
218 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
219
220 let ctx = AeronContext::new()?;
221 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
222 let mut error_handler = Handler::leak(ErrorCount::default());
223 ctx.set_error_handler(Some(&error_handler))?;
224 let aeron = Aeron::new(&ctx)?;
225 aeron.start()?;
226
227 const STRESS_ITERS: u16 = 60;
228 const POLL_TIMEOUT: Duration = Duration::from_secs(10);
229 const POLL_SLEEP: Duration = Duration::from_millis(10);
230
231 for i in 0..STRESS_ITERS {
234 let port = 55000u16 + i;
235 let channel = format!("aeron:udp?endpoint=203.0.113.1:{}", port);
236 let pub_poller =
237 aeron.async_add_publication(&channel.clone().into_c_string(), 4500 + i as i32)?;
238 let sub_poller = aeron.async_add_subscription(
239 &channel.into_c_string(),
240 4500 + i as i32,
241 Handlers::no_available_image_handler(),
242 Handlers::no_unavailable_image_handler(),
243 )?;
244
245 let start = Instant::now();
246 let mut publication = None;
247 let mut publication_done = false;
248 let mut subscription = None;
249 let mut subscription_done = false;
250
251 while !(publication_done && subscription_done) && start.elapsed() < POLL_TIMEOUT {
252 if !publication_done {
253 match pub_poller.poll() {
254 Ok(Some(pub_)) => {
255 publication = Some(pub_);
256 publication_done = true;
257 }
258 Ok(None) => {}
259 Err(err) => {
260 info!("publication async add finished with error on iteration {i}: {err:?}");
261 publication_done = true;
262 }
263 }
264 }
265
266 if !subscription_done {
267 match sub_poller.poll() {
268 Ok(Some(sub_)) => {
269 subscription = Some(sub_);
270 subscription_done = true;
271 }
272 Ok(None) => {}
273 Err(err) => {
274 info!("subscription async add finished with error on iteration {i}: {err:?}");
275 subscription_done = true;
276 }
277 }
278 }
279
280 if !(publication_done && subscription_done) {
281 std::thread::sleep(POLL_SLEEP);
282 }
283 }
284
285 assert!(
286 publication_done,
287 "publication async add did not complete on iteration {i} within {:?}",
288 POLL_TIMEOUT
289 );
290 assert!(
291 subscription_done,
292 "subscription async add did not complete on iteration {i} within {:?}",
293 POLL_TIMEOUT
294 );
295
296 drop(subscription);
297 drop(publication);
298 drop(sub_poller);
299 drop(pub_poller);
300 }
301
302 drop(aeron);
303 stop.store(true, Ordering::SeqCst);
304 let _ = driver_handle.join().unwrap();
305 error_handler.release();
306 Ok(())
307 }
308
309 #[test]
310 #[serial]
311 fn async_subscription_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
313 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
314
315 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
316 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
317 media_driver_ctx.set_dir_delete_on_start(true)?;
318 media_driver_ctx.set_dir(
319 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
320 )?;
321 let (stop, driver_handle) =
322 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
323
324 let ctx = AeronContext::new()?;
325 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
326 let mut error_handler = Handler::leak(ErrorCount::default());
327 ctx.set_error_handler(Some(&error_handler))?;
328 let aeron = Aeron::new(&ctx)?;
329 aeron.start()?;
330
331 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54323");
333
334 let poller = aeron.async_add_subscription(
335 &channel.into_c_string(),
336 4323,
337 Handlers::no_available_image_handler(),
338 Handlers::no_unavailable_image_handler(),
339 )?;
340
341 let start = Instant::now();
342 while start.elapsed() < Duration::from_millis(250) {
343 let _ = poller.poll();
344 #[cfg(debug_assertions)]
345 std::thread::sleep(Duration::from_millis(10));
346 }
347
348 drop(poller);
349 drop(aeron);
350 stop.store(true, Ordering::SeqCst);
351 let _ = driver_handle.join().unwrap();
352 error_handler.release();
353 Ok(())
354 }
355
356 #[test]
357 #[serial]
358 fn blocking_add_subscription_invalid_interface_timeout() -> Result<(), Box<dyn error::Error>> {
359 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
360
361 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
362 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
363 media_driver_ctx.set_dir_delete_on_start(true)?;
364 media_driver_ctx.set_dir(
365 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
366 )?;
367 let (stop, driver_handle) =
368 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
369
370 let ctx = AeronContext::new()?;
371 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
372 let mut error_handler = Handler::leak(ErrorCount::default());
373 ctx.set_error_handler(Some(&error_handler))?;
374 let aeron = Aeron::new(&ctx)?;
375 aeron.start()?;
376
377 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54324");
378
379 let result = aeron.add_subscription(
380 &channel.into_c_string(),
381 4324,
382 Handlers::no_available_image_handler(),
383 Handlers::no_unavailable_image_handler(),
384 Duration::from_millis(300),
385 );
386
387 assert!(result.is_err(), "expected error for invalid interface");
388 drop(aeron);
389 stop.store(true, Ordering::SeqCst);
390 let _ = driver_handle.join().unwrap();
391 error_handler.release();
392 Ok(())
393 }
394
395 #[test]
396 #[serial]
397 fn async_publication_invalid_bind_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
398 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
399
400 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
401 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
402 media_driver_ctx.set_dir_delete_on_start(true)?;
403 media_driver_ctx.set_dir(
404 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
405 )?;
406 let (stop, driver_handle) =
407 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
408
409 let ctx = AeronContext::new()?;
410 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
411 let mut error_handler = Handler::leak(ErrorCount::default());
412 ctx.set_error_handler(Some(&error_handler))?;
413 let aeron = Aeron::new(&ctx)?;
414 aeron.start()?;
415
416 let channel = format!("aeron:udp?endpoint=127.0.0.1:54330|bind=203.0.113.1:60000");
418
419 let poller = aeron.async_add_publication(&channel.into_c_string(), 4330)?;
420 let start = Instant::now();
421 while start.elapsed() < Duration::from_millis(250) {
422 let _ = poller.poll();
423 #[cfg(debug_assertions)]
424 std::thread::sleep(Duration::from_millis(10));
425 }
426 drop(poller);
427 drop(aeron);
428 stop.store(true, Ordering::SeqCst);
429 let _ = driver_handle.join().unwrap();
430 error_handler.release();
431 Ok(())
432 }
433
434 #[test]
435 #[serial]
436 pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
437 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
438 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
439 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
440 media_driver_ctx.set_dir_delete_on_start(true)?;
441 media_driver_ctx.set_dir(
442 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
443 )?;
444 media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; media_driver_ctx.set_driver_timeout_ms(60_000)?; let (stop, driver_handle) =
451 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
452
453 let ctx = AeronContext::new()?;
454 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
455 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
456 ctx.set_driver_timeout_ms(60_000)?;
458 let mut error_handler = Handler::leak(ErrorCount::default());
462 let mut new_pub_handler = Handler::leak(AeronNewPublicationLogger);
463 let mut avail_counter_handler1 = Handler::leak(AeronAvailableCounterLogger);
464 let mut close_client_handler = Handler::leak(AeronCloseClientLogger);
465 let mut new_sub_handler = Handler::leak(AeronNewSubscriptionLogger);
466 let mut unavail_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
467 let mut avail_counter_handler2 = Handler::leak(AeronAvailableCounterLogger);
468 let mut excl_pub_handler = Handler::leak(AeronNewPublicationLogger);
469 ctx.set_error_handler(Some(&error_handler))?;
470 ctx.set_on_new_publication(Some(&new_pub_handler))?;
471 ctx.set_on_available_counter(Some(&avail_counter_handler1))?;
472 ctx.set_on_close_client(Some(&close_client_handler))?;
473 ctx.set_on_new_subscription(Some(&new_sub_handler))?;
474 ctx.set_on_unavailable_counter(Some(&unavail_counter_handler))?;
475 ctx.set_on_available_counter(Some(&avail_counter_handler2))?;
476 ctx.set_on_new_exclusive_publication(Some(&excl_pub_handler))?;
477
478 info!("creating client [simple_large_send test]");
479 let aeron = Aeron::new(&ctx)?;
480 info!("starting client");
481
482 aeron.start()?;
483 info!("client started");
484 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
485 info!("created publisher");
486
487 assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
488 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
489 AeronCncMetadata::read_from_file(&cstr, |cnc| {
490 assert!(cnc.pid > 0);
491 })?;
492 assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
493 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
494 for _ in 0..50 {
495 AeronCnc::read_on_partial_stack(&cstr, |cnc| {
496 assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
497 })?;
498 }
499
500 let subscription = aeron.add_subscription(
501 AERON_IPC_STREAM,
502 123,
503 Handlers::no_available_image_handler(),
504 Handlers::no_unavailable_image_handler(),
505 Duration::from_secs(5),
506 )?;
507 info!("created subscription");
508
509 subscription
510 .poll_once(|msg, header| println!("foo"), 1024)
511 .unwrap();
512
513 let string_len = media_driver_ctx.ipc_mtu_length * 100;
515 info!("string length: {}", string_len);
516
517 let stop_publisher = Arc::new(AtomicBool::new(false));
518
519 let publisher_handler = {
520 let stop_publisher = stop_publisher.clone();
521 std::thread::spawn(move || {
522 let binding = "1".repeat(string_len);
523 let large_msg = binding.as_bytes();
524 loop {
525 if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
526 break;
527 }
528 let result =
529 publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
530
531 assert_eq!(123, publisher.get_constants().unwrap().stream_id);
532
533 if result < large_msg.len() as i64 {
534 let error = AeronCError::from_code(result as i32);
535 match error.kind() {
536 AeronErrorType::PublicationBackPressured
537 | AeronErrorType::PublicationAdminAction => {
538 }
540 _ => {
541 error!(
542 "ERROR: failed to send message {:?}",
543 AeronCError::from_code(result as i32)
544 );
545 }
546 }
547 sleep(Duration::from_millis(500));
548 }
549 }
550 info!("stopping publisher thread");
551 })
552 };
553
554 let mut assembler = AeronFragmentClosureAssembler::new()?;
555
556 struct Context {
557 count: Arc<AtomicUsize>,
558 stop: Arc<AtomicBool>,
559 string_len: usize,
560 }
561
562 let count = Arc::new(AtomicUsize::new(0usize));
563 let mut context = Context {
564 count: count.clone(),
565 stop: stop.clone(),
566 string_len,
567 };
568
569 let start_time = Instant::now();
571
572 let loop_result: Result<(), Box<dyn error::Error>> = loop {
575 if start_time.elapsed() > Duration::from_secs(120) {
576 info!("Failed: exceeded 120-second timeout");
577 break Err(Box::new(std::io::Error::new(
578 std::io::ErrorKind::TimedOut,
579 "Timeout exceeded",
580 )));
581 }
582 let c = count.load(Ordering::SeqCst);
583 if c > 100 {
584 break Ok(());
585 }
586
587 fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
588 ctx.count.fetch_add(1, Ordering::SeqCst);
589
590 let values = header.get_values().unwrap();
591 assert_ne!(values.frame.session_id, 0);
592
593 if buffer.len() != ctx.string_len {
594 ctx.stop.store(true, Ordering::SeqCst);
595 error!(
596 "ERROR: message was {} but was expecting {} [header={:?}]",
597 buffer.len(),
598 ctx.string_len,
599 header
600 );
601 sleep(Duration::from_secs(1));
602 }
603
604 assert_eq!(buffer.len(), ctx.string_len);
605 assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
606 }
607
608 subscription.poll(assembler.process(&mut context, process_msg), 128)?;
609 assert_eq!(123, subscription.get_constants().unwrap().stream_id);
610 };
611
612 subscription.close(Handlers::no_notification_handler())?;
613
614 info!("stopping client");
615 stop_publisher.store(true, Ordering::SeqCst);
616
617 let _ = publisher_handler.join().unwrap();
618 drop(subscription);
619 drop(aeron);
620
621 stop.store(true, Ordering::SeqCst);
622 let _ = driver_handle.join().unwrap();
623
624 error_handler.release();
626 new_pub_handler.release();
627 avail_counter_handler1.release();
628 close_client_handler.release();
629 new_sub_handler.release();
630 unavail_counter_handler.release();
631 avail_counter_handler2.release();
632 excl_pub_handler.release();
633
634 let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
635 cnc.counters_reader().foreach_counter_once(
636 |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
637 println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
638 AeronSystemCounterType::try_from(type_id));
639 },
640 );
641 cnc.error_log_read_once(| observation_count: i32,
642 first_observation_timestamp: i64,
643 last_observation_timestamp: i64,
644 error: &str| {
645 println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
646 }, 0);
647 cnc.loss_reporter_read_once(| observation_count: i64,
648 total_bytes_lost: i64,
649 first_observation_timestamp: i64,
650 last_observation_timestamp: i64,
651 session_id: i32,
652 stream_id: i32,
653 channel: &str,
654 source: &str,| {
655 println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
656 })?;
657
658 loop_result?;
659 Ok(())
660 }
661
662 #[test]
663 #[serial]
664 pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
665 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
666 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
667 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
668 media_driver_ctx.set_dir_delete_on_start(true)?;
669 media_driver_ctx.set_dir(
670 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
671 )?;
672 media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?; media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?; media_driver_ctx.set_driver_timeout_ms(60_000)?; let (stop, driver_handle) =
679 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
680
681 let ctx = AeronContext::new()?;
682 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
683 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
684 ctx.set_driver_timeout_ms(60_000)?;
686 let mut error_handler = Handler::leak(ErrorCount::default());
687 ctx.set_error_handler(Some(&error_handler))?;
688
689 info!("creating client [try_claim test]");
690 let aeron = Aeron::new(&ctx)?;
691 info!("starting client");
692
693 aeron.start()?;
694 info!("client started");
695 const STREAM_ID: i32 = 123;
696 let publisher =
697 aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
698 info!("created publisher");
699
700 let subscription = aeron.add_subscription(
701 AERON_IPC_STREAM,
702 STREAM_ID,
703 Handlers::no_available_image_handler(),
704 Handlers::no_unavailable_image_handler(),
705 Duration::from_secs(5),
706 )?;
707 info!("created subscription");
708
709 let string_len = 156;
711 info!("string length: {}", string_len);
712
713 let stop_publisher = Arc::new(AtomicBool::new(false));
714
715 let publisher_handler = {
716 let stop_publisher = stop_publisher.clone();
717 std::thread::spawn(move || {
718 let binding = "1".repeat(string_len);
719 let msg = binding.as_bytes();
720 let buffer = AeronBufferClaim::default();
721 loop {
722 if stop_publisher.load(Ordering::Acquire) || publisher.is_closed() {
723 break;
724 }
725
726 let result = publisher.try_claim(string_len, &buffer);
727
728 if result < msg.len() as i64 {
729 error!(
730 "ERROR: failed to send message {:?}",
731 AeronCError::from_code(result as i32)
732 );
733 } else {
734 buffer.data().write_all(&msg).unwrap();
735 buffer.commit().unwrap();
736 }
737 }
738 info!("stopping publisher thread");
739 })
740 };
741
742 let count = Arc::new(AtomicUsize::new(0usize));
743 let count_copy = Arc::clone(&count);
744 let stop2 = stop.clone();
745
746 struct FragmentHandler {
747 count_copy: Arc<AtomicUsize>,
748 stop2: Arc<AtomicBool>,
749 string_len: usize,
750 }
751
752 impl AeronFragmentHandlerCallback for FragmentHandler {
753 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
754 assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
755 let header = header.get_values().unwrap();
756 let frame = header.frame();
757 let stream_id = frame.stream_id();
758 assert_eq!(STREAM_ID, stream_id);
759
760 self.count_copy.fetch_add(1, Ordering::SeqCst);
761
762 if buffer.len() != self.string_len {
763 self.stop2.store(true, Ordering::SeqCst);
764 error!(
765 "ERROR: message was {} but was expecting {} [header={:?}]",
766 buffer.len(),
767 self.string_len,
768 header
769 );
770 sleep(Duration::from_secs(1));
771 }
772
773 assert_eq!(buffer.len(), self.string_len);
774 assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
775 }
776 }
777
778 let (mut closure, mut inner_handler) =
779 Handler::leak_with_fragment_assembler(FragmentHandler {
780 count_copy,
781 stop2,
782 string_len,
783 })?;
784 let loop_result: Result<(), Box<dyn error::Error>> = {
785 let start_time = Instant::now();
786 loop {
787 if start_time.elapsed() > Duration::from_secs(120) {
788 info!("Failed: exceeded 120-second timeout");
789 break Err(Box::new(std::io::Error::new(
790 std::io::ErrorKind::TimedOut,
791 "Timeout exceeded",
792 )));
793 }
794 let c = count.load(Ordering::SeqCst);
795 if c > 100 {
796 break Ok(());
797 }
798 subscription.poll(Some(&closure), 128)?;
799 }
800 };
801
802 info!("stopping client");
803
804 stop_publisher.store(true, Ordering::SeqCst);
805
806 let _ = publisher_handler.join().unwrap();
807 drop(subscription);
808 drop(aeron);
809
810 stop.store(true, Ordering::SeqCst);
811 let _ = driver_handle.join().unwrap();
812 closure.release();
813 inner_handler.release();
814 error_handler.release();
815 loop_result?;
816 Ok(())
817 }
818
819 #[test]
820 #[serial]
821 pub fn counters() -> Result<(), Box<dyn error::Error>> {
822 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
823 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
824 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
825 media_driver_ctx.set_dir_delete_on_start(true)?;
826 media_driver_ctx.set_dir(
827 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
828 )?;
829 let (stop, driver_handle) =
830 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
831
832 let ctx = AeronContext::new()?;
833 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
834 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
835 let mut error_handler = Handler::leak(ErrorCount::default());
836 ctx.set_error_handler(Some(&error_handler))?;
837 let mut unavailable_counter_handler = Handler::leak(AeronUnavailableCounterLogger);
838 ctx.set_on_unavailable_counter(Some(&unavailable_counter_handler))?;
839
840 struct AvailableCounterHandler {
841 found_counter: bool,
842 }
843
844 impl AeronAvailableCounterCallback for AvailableCounterHandler {
845 fn handle_aeron_on_available_counter(
846 &mut self,
847 counters_reader: AeronCountersReader,
848 registration_id: i64,
849 counter_id: i32,
850 ) -> () {
851 info!(
852 "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
853 String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
854 counters_reader.get_counter_label(counter_id, 1000),
855 counters_reader.addr(counter_id)
856 );
857
858 assert_eq!(
859 counters_reader.counter_registration_id(counter_id).unwrap(),
860 registration_id
861 );
862
863 if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
864 if label == "label_buffer" {
865 self.found_counter = true;
866 assert_eq!(
867 &counters_reader.get_counter_key(counter_id).unwrap(),
868 "key".as_bytes()
869 );
870 }
871 }
872 }
873 }
874
875 let mut available_counter_handler = Handler::leak(AvailableCounterHandler {
876 found_counter: false,
877 });
878 ctx.set_on_available_counter(Some(&available_counter_handler))?;
879
880 info!("creating client");
881 let aeron = Aeron::new(&ctx)?;
882 info!("starting client");
883
884 aeron.start()?;
885 info!("client started [counters test]");
886
887 let counter = aeron.add_counter(
888 123,
889 "key".as_bytes(),
890 "label_buffer",
891 Duration::from_secs(5),
892 )?;
893 let constants = counter.get_constants()?;
894 let counter_id = constants.counter_id;
895
896 let stop_publisher = Arc::new(AtomicBool::new(false));
897
898 let publisher_handler = {
899 let stop_publisher = stop_publisher.clone();
900 let counter = counter.clone();
901 std::thread::spawn(move || {
902 for _ in 0..150 {
903 if stop_publisher.load(Ordering::Acquire) || counter.is_closed() {
904 break;
905 }
906 counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
907 }
908 info!("stopping publisher thread");
909 })
910 };
911
912 let now = Instant::now();
913 while counter.addr_atomic().load(Ordering::SeqCst) < 100
914 && now.elapsed() < Duration::from_secs(10)
915 {
916 sleep(Duration::from_micros(10));
917 }
918
919 assert!(now.elapsed() < Duration::from_secs(10));
920
921 info!(
922 "counter is {}",
923 counter.addr_atomic().load(Ordering::SeqCst)
924 );
925
926 info!("stopping client");
927
928 #[cfg(not(target_os = "windows"))] assert!(available_counter_handler.found_counter);
930
931 let reader = aeron.counters_reader();
932 assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
933 assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
934 let buffers = AeronCountersReaderBuffers::default();
935 reader.get_buffers(&buffers)?;
936
937 stop_publisher.store(true, Ordering::SeqCst);
938
939 let _ = publisher_handler.join().unwrap();
940 drop(counter);
941 drop(aeron);
942
943 stop.store(true, Ordering::SeqCst);
944 let _ = driver_handle.join().unwrap();
945 available_counter_handler.release();
946 unavailable_counter_handler.release();
947 error_handler.release();
948 Ok(())
949 }
950
951 #[derive(Default, Debug)]
953 struct TestErrorCount {
954 pub error_count: usize,
955 }
956
957 impl Drop for TestErrorCount {
958 fn drop(&mut self) {
959 info!("TestErrorCount dropped with {} errors", self.error_count);
960 }
961 }
962
963 impl AeronErrorHandlerCallback for TestErrorCount {
964 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
965 error!("Aeron error {}: {}", error_code, msg);
966 self.error_count += 1;
967 }
968 }
969
970 #[test]
971 #[serial]
972 pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
973 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
974
975 let under_valgrind = running_under_valgrind();
976 let driver_timeout_ms = if under_valgrind { 180_000 } else { 60_000 };
977 let liveness_timeout_ns = if under_valgrind {
978 180_000_000_000
979 } else {
980 60_000_000_000
981 };
982 let poll_timeout = Duration::from_millis(driver_timeout_ms as u64);
983
984 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
985 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
986 media_driver_ctx.set_dir_delete_on_start(true)?;
987 media_driver_ctx.set_dir(
988 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
989 )?;
990 media_driver_ctx.set_client_liveness_timeout_ns(liveness_timeout_ns)?;
991 media_driver_ctx.set_image_liveness_timeout_ns(liveness_timeout_ns)?;
992 media_driver_ctx.set_publication_unblock_timeout_ns(liveness_timeout_ns + 5_000_000_000)?;
993 media_driver_ctx.set_driver_timeout_ms(driver_timeout_ms)?;
994 let (stop, driver_handle) =
995 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
996
997 let ctx = AeronContext::new()?;
998 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
999 ctx.set_driver_timeout_ms(driver_timeout_ms)?;
1000 let mut error_handler = Handler::leak(TestErrorCount::default());
1001 ctx.set_error_handler(Some(&error_handler))?;
1002
1003 let aeron = Aeron::new(&ctx)?;
1004 aeron.start()?;
1005
1006 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1007 let subscription = aeron.add_subscription(
1008 AERON_IPC_STREAM,
1009 123,
1010 Handlers::no_available_image_handler(),
1011 Handlers::no_unavailable_image_handler(),
1012 Duration::from_secs(5),
1013 )?;
1014
1015 let count = Arc::new(AtomicUsize::new(0));
1016 let start_time = Instant::now();
1017
1018 let stop_publisher = Arc::new(AtomicBool::new(false));
1019
1020 let publisher_thread = {
1022 let stop_publisher = stop_publisher.clone();
1023 std::thread::spawn(move || {
1024 while !stop_publisher.load(Ordering::Acquire) {
1025 let msg = b"test";
1026 let result =
1027 publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1028 if result == AeronErrorType::PublicationBackPressured.code() as i64 {
1030 sleep(Duration::from_millis(50));
1031 }
1032 if publisher.is_closed() {
1033 break;
1034 }
1035 }
1036 })
1037 };
1038
1039 while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < poll_timeout {
1041 let _ = subscription.poll_once(
1042 |_msg, _header| {
1043 count.fetch_add(1, Ordering::SeqCst);
1044 },
1045 128,
1046 )?;
1047 }
1048
1049 stop_publisher.store(true, Ordering::SeqCst);
1050 publisher_thread.join().unwrap();
1051 drop(subscription);
1052 drop(aeron);
1053 stop.store(true, Ordering::SeqCst);
1054 let _ = driver_handle.join().unwrap();
1055 error_handler.release();
1056
1057 assert!(
1058 count.load(Ordering::SeqCst) >= 50,
1059 "Expected at least 50 messages received"
1060 );
1061 Ok(())
1062 }
1063
1064 #[test]
1065 #[serial]
1066 pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
1067 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1068
1069 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1070 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1071 media_driver_ctx.set_dir_delete_on_start(true)?;
1072 media_driver_ctx.set_dir(
1073 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1074 )?;
1075 media_driver_ctx.set_client_liveness_timeout_ns(60_000_000_000)?;
1076 media_driver_ctx.set_image_liveness_timeout_ns(60_000_000_000)?;
1077 media_driver_ctx.set_publication_unblock_timeout_ns(65_000_000_000)?;
1078 media_driver_ctx.set_driver_timeout_ms(60_000)?;
1079 let (_stop, driver_handle) =
1080 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1081
1082 let ctx = AeronContext::new()?;
1083 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1084 let mut error_handler = Handler::leak(TestErrorCount::default());
1085 ctx.set_error_handler(Some(&error_handler))?;
1086
1087 let aeron = Aeron::new(&ctx)?;
1088 aeron.start()?;
1089 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1090
1091 let subscription1 = aeron.add_subscription(
1093 AERON_IPC_STREAM,
1094 123,
1095 Handlers::no_available_image_handler(),
1096 Handlers::no_unavailable_image_handler(),
1097 Duration::from_secs(5),
1098 )?;
1099 let subscription2 = aeron.add_subscription(
1100 AERON_IPC_STREAM,
1101 123,
1102 Handlers::no_available_image_handler(),
1103 Handlers::no_unavailable_image_handler(),
1104 Duration::from_secs(5),
1105 )?;
1106
1107 let count1 = Arc::new(AtomicUsize::new(0));
1108 let count2 = Arc::new(AtomicUsize::new(0));
1109
1110 let msg = b"hello multi-subscription";
1112 let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
1113 assert!(
1114 result >= msg.len() as i64,
1115 "Message should be sent successfully"
1116 );
1117
1118 let start_time = Instant::now();
1119 while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
1121 && start_time.elapsed() < Duration::from_secs(5)
1122 {
1123 let _ = subscription1.poll_once(
1124 |_msg, _header| {
1125 count1.fetch_add(1, Ordering::SeqCst);
1126 },
1127 128,
1128 )?;
1129 let _ = subscription2.poll_once(
1130 |_msg, _header| {
1131 count2.fetch_add(1, Ordering::SeqCst);
1132 },
1133 128,
1134 )?;
1135 }
1136
1137 assert!(
1138 count1.load(Ordering::SeqCst) >= 1,
1139 "Subscription 1 did not receive the message"
1140 );
1141 assert!(
1142 count2.load(Ordering::SeqCst) >= 1,
1143 "Subscription 2 did not receive the message"
1144 );
1145
1146 drop(subscription2);
1147 drop(subscription1);
1148 drop(publisher);
1149 drop(aeron);
1150 _stop.store(true, Ordering::SeqCst);
1151 let _ = driver_handle.join().unwrap();
1152 error_handler.release();
1153 Ok(())
1154 }
1155
1156 #[test]
1157 #[serial]
1158 pub fn should_be_able_to_drop_after_close_manually_being_closed(
1159 ) -> Result<(), Box<dyn error::Error>> {
1160 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1161
1162 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1163 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1164 media_driver_ctx.set_dir_delete_on_start(true)?;
1165 media_driver_ctx.set_dir(
1166 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1167 )?;
1168 let (_stop, driver_handle) =
1169 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1170
1171 let ctx = AeronContext::new()?;
1172 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1173 let mut error_handler = Handler::leak(AeronErrorHandlerLogger);
1174 ctx.set_error_handler(Some(&error_handler))?;
1175
1176 let aeron = Aeron::new(&ctx)?;
1177 aeron.start()?;
1178
1179 {
1180 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1181 info!("created publication [sessionId={}]", publisher.session_id());
1182 publisher.close_with_no_args()?;
1183 drop(publisher);
1184 }
1185
1186 {
1187 let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
1188 info!("created publication [sessionId={}]", publisher.session_id());
1189 publisher.close(Handlers::no_notification_handler())?;
1190 drop(publisher);
1191 }
1192
1193 {
1194 let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
1195 publisher.close_once(|| println!("on close"))?;
1196 info!("created publication [sessionId={}]", publisher.session_id());
1197 drop(publisher);
1198 }
1199
1200 drop(aeron);
1201 _stop.store(true, Ordering::SeqCst);
1202 let _ = driver_handle.join().unwrap();
1203 error_handler.release();
1204 Ok(())
1205 }
1206
1207 #[test]
1208 #[serial]
1209 pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
1210 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1211
1212 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1213 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1214 media_driver_ctx.set_dir_delete_on_start(true)?;
1215 media_driver_ctx.set_dir(
1216 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1217 )?;
1218 let (_stop, driver_handle) =
1219 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1220
1221 let ctx = AeronContext::new()?;
1222 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1223 let mut error_handler = Handler::leak(TestErrorCount::default());
1224 ctx.set_error_handler(Some(&error_handler))?;
1225
1226 let aeron = Aeron::new(&ctx)?;
1227 aeron.start()?;
1228 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1229
1230 publisher.close(Handlers::no_notification_handler())?;
1232
1233 let result = publisher.offer(
1235 b"should fail",
1236 Handlers::no_reserved_value_supplier_handler(),
1237 );
1238 assert!(
1239 result < 0,
1240 "Offering on a closed publication should return a negative error code"
1241 );
1242
1243 drop(publisher);
1244 drop(aeron);
1245 _stop.store(true, Ordering::SeqCst);
1246 let _ = driver_handle.join().unwrap();
1247 error_handler.release();
1248 Ok(())
1249 }
1250
1251 #[test]
1253 #[serial]
1254 pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
1255 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1256
1257 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1258 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1259 media_driver_ctx.set_dir_delete_on_start(true)?;
1260 media_driver_ctx.set_dir(
1261 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1262 )?;
1263 let (_stop, driver_handle) =
1264 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1265
1266 let ctx = AeronContext::new()?;
1267 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1268 let mut error_handler = Handler::leak(TestErrorCount::default());
1269 ctx.set_error_handler(Some(&error_handler))?;
1270
1271 let aeron = Aeron::new(&ctx)?;
1272 aeron.start()?;
1273 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1274 let subscription = aeron.add_subscription(
1275 AERON_IPC_STREAM,
1276 123,
1277 Handlers::no_available_image_handler(),
1278 Handlers::no_unavailable_image_handler(),
1279 Duration::from_secs(5),
1280 )?;
1281
1282 let empty_received = Arc::new(AtomicBool::new(false));
1283 let start_time = Instant::now();
1284
1285 let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
1286 assert!(result > 0);
1287
1288 while !empty_received.load(Ordering::SeqCst)
1289 && start_time.elapsed() < Duration::from_secs(5)
1290 {
1291 let _ = subscription.poll_once(
1292 |msg, _header| {
1293 if msg.is_empty() {
1294 empty_received.store(true, Ordering::SeqCst);
1295 }
1296 },
1297 128,
1298 )?;
1299 }
1300
1301 assert!(
1302 empty_received.load(Ordering::SeqCst),
1303 "Empty message was not received"
1304 );
1305 drop(subscription);
1306 drop(publisher);
1307 drop(aeron);
1308 _stop.store(true, Ordering::SeqCst);
1309 let _ = driver_handle.join().unwrap();
1310 error_handler.release();
1311 Ok(())
1312 }
1313
1314 #[derive(Default, Debug)]
1315 struct MdcTotals {
1316 gap_events: u64,
1317 missing_messages: u64,
1318 received_messages: u64,
1319 }
1320
1321 #[derive(Debug)]
1322 struct MdcWindowStats {
1323 expected_seq: Option<u64>,
1324 gap_events: u64,
1325 missing_messages: u64,
1326 received_messages: u64,
1327 histogram: Histogram<u64>,
1328 }
1329
1330 impl MdcWindowStats {
1331 fn new() -> Result<Self, Box<dyn error::Error>> {
1332 Ok(Self {
1333 expected_seq: None,
1334 gap_events: 0,
1335 missing_messages: 0,
1336 received_messages: 0,
1337 histogram: Histogram::new(3)?,
1338 })
1339 }
1340
1341 fn observe(&mut self, seq: u64, sent_ts_ns: u64) {
1342 self.received_messages += 1;
1343
1344 match self.expected_seq {
1345 None => self.expected_seq = Some(seq.saturating_add(1)),
1346 Some(expected) if seq > expected => {
1347 self.gap_events += 1;
1348 self.missing_messages += seq - expected;
1349 self.expected_seq = Some(seq.saturating_add(1));
1350 }
1351 Some(expected) if seq == expected => {
1352 self.expected_seq = Some(expected.saturating_add(1));
1353 }
1354 Some(_) => {
1355 }
1357 }
1358
1359 let now_ns = Aeron::nano_clock().max(0) as u64;
1360 let latency_ns = now_ns.saturating_sub(sent_ts_ns);
1361 let _ = self.histogram.record(latency_ns);
1362 }
1363
1364 fn print_and_reset(
1365 &mut self,
1366 window_number: usize,
1367 interval: Duration,
1368 totals: &mut MdcTotals,
1369 ) {
1370 totals.gap_events += self.gap_events;
1371 totals.missing_messages += self.missing_messages;
1372 totals.received_messages += self.received_messages;
1373
1374 if self.histogram.len() > 0 {
1375 let min_us = self.histogram.min() / 1_000;
1376 let p50_us = self.histogram.value_at_quantile(0.50) / 1_000;
1377 let p99_us = self.histogram.value_at_quantile(0.99) / 1_000;
1378 let max_us = self.histogram.max() / 1_000;
1379 println!(
1380 "[mdc-window-{window_number}] interval={interval:?} received={} gaps={} missing={} latency_us[min={}, p50={}, p99={}, max={}]",
1381 self.received_messages, self.gap_events, self.missing_messages, min_us, p50_us, p99_us, max_us,
1382 );
1383 } else {
1384 println!(
1385 "[mdc-window-{window_number}] interval={interval:?} received=0 gaps=0 missing=0 latency_us[min=n/a, p50=n/a, p99=n/a, max=n/a]"
1386 );
1387 }
1388
1389 self.expected_seq = None;
1390 self.gap_events = 0;
1391 self.missing_messages = 0;
1392 self.received_messages = 0;
1393 self.histogram.reset();
1394 }
1395 }
1396
1397 #[test]
1407 #[serial]
1408 #[ignore] pub fn mdc_unreliable_gap_latency_histogram_report() -> Result<(), Box<dyn error::Error>> {
1410 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
1411
1412 const STREAM_ID: i32 = 32931;
1413 const CONTROL_PORT: u16 = 32929;
1414 const SUBSCRIBER_PORT: u16 = 32930;
1415 const MESSAGE_LEN: usize = 130;
1416
1417 let report_interval = Duration::from_secs(
1418 std::env::var("RUSTERON_MDC_REPORT_INTERVAL_SECS")
1419 .ok()
1420 .and_then(|value| value.parse::<u64>().ok())
1421 .unwrap_or(10),
1422 );
1423 let test_duration = Duration::from_secs(
1424 std::env::var("RUSTERON_MDC_TEST_DURATION_SECS")
1425 .ok()
1426 .and_then(|value| value.parse::<u64>().ok())
1427 .unwrap_or(300),
1428 );
1429 let mdc_host = std::env::var("RUSTERON_MDC_HOST").unwrap_or("127.0.0.1".to_string());
1430
1431 let publication_channel = format!(
1432 "aeron:udp?control-mode=manual|control={}:{CONTROL_PORT}",
1433 mdc_host
1434 );
1435 let subscription_channel = format!(
1438 "aeron:udp?endpoint={}:{SUBSCRIBER_PORT}|reliable=false|tether=false|group=false|nak-delay=500us",
1439 mdc_host
1440 );
1441 let destination_uri = format!("aeron:udp?endpoint={}:{SUBSCRIBER_PORT}", mdc_host);
1442
1443 let (media_driver_ctx, stop_driver, driver_handle) = start_media_driver(32930)?;
1444 let aeron_dir = media_driver_ctx.get_dir().to_string();
1445
1446 println!(
1447 "[mdc-start] publication={} subscription={} duration={:?} report_interval={:?}",
1448 publication_channel, subscription_channel, test_duration, report_interval
1449 );
1450
1451 let running = Arc::new(AtomicBool::new(true));
1452
1453 let subscriber_dir = aeron_dir.clone();
1454 let subscriber_channel = subscription_channel.clone();
1455 let subscriber_running = Arc::clone(&running);
1456 let subscriber_thread = std::thread::spawn(move || -> MdcTotals {
1457 let (_ctx, aeron) = create_client_for_dir(&subscriber_dir)
1458 .expect("failed to create subscriber aeron client");
1459
1460 let subscription = aeron
1461 .add_subscription(
1462 &subscriber_channel.into_c_string(),
1463 STREAM_ID,
1464 Handlers::no_available_image_handler(),
1465 Handlers::no_unavailable_image_handler(),
1466 Duration::from_secs(5),
1467 )
1468 .expect("failed to create subscriber");
1469
1470 let mut totals = MdcTotals::default();
1471 let mut window_stats = MdcWindowStats::new().expect("failed to create histogram");
1472 let test_start = Instant::now();
1473 let mut window_start = test_start;
1474 let mut window_number = 1usize;
1475
1476 while test_start.elapsed() < test_duration {
1477 let _ = subscription
1478 .poll_once(
1479 |msg, _header| {
1480 if msg.len() < 16 {
1481 return;
1482 }
1483
1484 let seq = u64::from_le_bytes(msg[0..8].try_into().unwrap());
1485 let sent_ts_ns = u64::from_le_bytes(msg[8..16].try_into().unwrap());
1486 window_stats.observe(seq, sent_ts_ns);
1487 },
1488 10_000,
1489 )
1490 .expect("subscriber poll failed");
1491
1492 if window_start.elapsed() >= report_interval {
1493 window_stats.print_and_reset(window_number, report_interval, &mut totals);
1494 window_number += 1;
1495 window_start = Instant::now();
1496 }
1497 }
1498
1499 window_stats.print_and_reset(window_number, report_interval, &mut totals);
1500 subscriber_running.store(false, Ordering::SeqCst);
1501 totals
1502 });
1503
1504 sleep(Duration::from_millis(250));
1506
1507 let publisher_dir = aeron_dir;
1508 let publisher_channel = publication_channel.clone();
1509 let publisher_destination = destination_uri.clone();
1510 let publisher_running = Arc::clone(&running);
1511 let publisher_thread = std::thread::spawn(move || -> u64 {
1512 let (_ctx, aeron) = create_client_for_dir(&publisher_dir)
1513 .expect("failed to create publisher aeron client");
1514
1515 let publication = aeron
1516 .add_exclusive_publication(
1517 &publisher_channel.into_c_string(),
1518 STREAM_ID,
1519 Duration::from_secs(5),
1520 )
1521 .expect("failed to create publication");
1522
1523 let add_destination =
1524 AeronAsyncDestination::aeron_exclusive_publication_async_add_destination(
1525 &aeron,
1526 &publication,
1527 &publisher_destination.into_c_string(),
1528 )
1529 .expect("failed to add manual MDC destination");
1530
1531 let add_destination_start = Instant::now();
1532 while add_destination
1533 .aeron_exclusive_publication_async_destination_poll()
1534 .expect("destination add poll failed")
1535 == 0
1536 {
1537 assert!(
1538 add_destination_start.elapsed() <= Duration::from_secs(5),
1539 "Timed out adding manual MDC destination"
1540 );
1541 sleep(Duration::from_millis(10));
1542 }
1543
1544 let connect_start = Instant::now();
1545 while !publication.is_connected() && connect_start.elapsed() < Duration::from_secs(5) {
1546 sleep(Duration::from_millis(10));
1547 }
1548 assert!(
1549 publication.is_connected(),
1550 "manual MDC publication did not connect to subscriber destination"
1551 );
1552
1553 let mut seq: u64 = 0;
1554 let mut payload = [0u8; MESSAGE_LEN];
1555 while publisher_running.load(Ordering::Acquire) {
1556 payload[0..8].copy_from_slice(&seq.to_le_bytes());
1557 let ts_ns = Aeron::nano_clock().max(0) as u64;
1558 payload[8..16].copy_from_slice(&ts_ns.to_le_bytes());
1559
1560 let result =
1561 publication.offer(&payload, Handlers::no_reserved_value_supplier_handler());
1562 if result > 0 {
1563 seq = seq.wrapping_add(1);
1564 }
1565 sleep(Duration::from_millis(1));
1566 }
1567 seq
1568 });
1569
1570 let totals = subscriber_thread.join().unwrap();
1571 running.store(false, Ordering::SeqCst);
1572 let sent_messages = publisher_thread.join().unwrap();
1573 stop_driver.store(true, Ordering::SeqCst);
1574 let _ = driver_handle.join().unwrap();
1575
1576 println!(
1577 "[mdc-summary] sent={} received={} total_gaps={} total_missing={}",
1578 sent_messages, totals.received_messages, totals.gap_events, totals.missing_messages
1579 );
1580
1581 assert!(sent_messages > 0, "publisher failed to send any messages");
1582 assert!(
1583 totals.received_messages > 0,
1584 "subscriber did not receive any messages"
1585 );
1586 Ok(())
1587 }
1588
1589 #[test]
1590 #[serial]
1591 #[ignore] pub fn tags() -> Result<(), Box<dyn error::Error>> {
1593 rusteron_code_gen::test_logger::init(log::LevelFilter::Debug);
1594
1595 let (md_ctx, stop, md) = start_media_driver(1)?;
1596
1597 let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
1598
1599 info!("creating suscriber 1");
1600 let sub = aeron_sub
1601 .add_subscription(
1602 &"aeron:udp?tags=100".into_c_string(),
1603 123,
1604 Handlers::no_available_image_handler(),
1605 Handlers::no_unavailable_image_handler(),
1606 Duration::from_secs(50),
1607 )
1608 .map_err(|e| {
1609 error!("aeron error={}", Aeron::errmsg());
1610 e
1611 })?;
1612
1613 let ctx = AeronContext::new()?;
1614 ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
1615 let aeron = Aeron::new(&ctx)?;
1616 aeron.start()?;
1617
1618 info!("creating suscriber 2");
1619 let sub2 = aeron_sub.add_subscription(
1620 &"aeron:udp?tags=100".into_c_string(),
1621 123,
1622 Handlers::no_available_image_handler(),
1623 Handlers::no_unavailable_image_handler(),
1624 Duration::from_secs(50),
1625 )?;
1626
1627 let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
1628 info!("creating publisher");
1629 assert!(!aeron_pub.is_closed());
1630 let publisher = aeron_pub
1631 .add_publication(
1632 &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
1633 123,
1634 Duration::from_secs(5),
1635 )
1636 .map_err(|e| {
1637 error!("aeron error={}", Aeron::errmsg());
1638 e
1639 })?;
1640
1641 info!("publishing msg");
1642
1643 loop {
1644 let result = publisher.offer(
1645 "213".as_bytes(),
1646 Handlers::no_reserved_value_supplier_handler(),
1647 );
1648 if result < 0 {
1649 error!(
1650 "failed to publish {:?}",
1651 AeronCError::from_code(result as i32)
1652 );
1653 } else {
1654 break;
1655 }
1656 }
1657
1658 sub.poll_once(
1659 |msg, _header| {
1660 println!("Received message: {:?}", msg);
1661 },
1662 128,
1663 )?;
1664 sub2.poll_once(
1665 |msg, _header| {
1666 println!("Received message: {:?}", msg);
1667 },
1668 128,
1669 )?;
1670
1671 stop.store(true, Ordering::SeqCst);
1672
1673 Ok(())
1674 }
1675
1676 fn create_client_for_dir(dir: &str) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1677 info!("creating aeron client [dir={}]", dir);
1678 let ctx = AeronContext::new()?;
1679 ctx.set_dir(&dir.into_c_string())?;
1680 let aeron = Aeron::new(&ctx)?;
1681 aeron.start()?;
1682 Ok((ctx, aeron))
1683 }
1684
1685 fn create_client(
1686 media_driver_ctx: &AeronDriverContext,
1687 ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1688 let dir = media_driver_ctx.get_dir().to_string();
1689 create_client_for_dir(&dir)
1690 }
1691
1692 fn start_media_driver(
1693 instance: u64,
1694 ) -> Result<
1695 (
1696 AeronDriverContext,
1697 Arc<AtomicBool>,
1698 JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
1699 ),
1700 Box<dyn Error>,
1701 > {
1702 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1703 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1704 media_driver_ctx.set_dir_delete_on_start(true)?;
1705 media_driver_ctx.set_dir(
1706 &format!(
1707 "{}{}-{}",
1708 media_driver_ctx.get_dir(),
1709 Aeron::epoch_clock(),
1710 instance
1711 )
1712 .into_c_string(),
1713 )?;
1714 let (stop, driver_handle) =
1715 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1716 Ok((media_driver_ctx, stop, driver_handle))
1717 }
1718
1719 #[doc = include_str!("../../README.md")]
1720 mod readme_tests {}
1721}