1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8pub mod bindings {
18 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22
23include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
24include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
25
26#[cfg(test)]
27mod tests {
28 use super::*;
29 use crate::test_alloc::current_allocs;
30 use log::{error, info};
31 use rusteron_media_driver::AeronDriverContext;
32 use serial_test::serial;
33 use std::error;
34 use std::error::Error;
35 use std::io::Write;
36 use std::os::raw::c_int;
37 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
38 use std::sync::Arc;
39 use std::thread::{sleep, JoinHandle};
40 use std::time::{Duration, Instant};
41
42 #[derive(Default, Debug)]
43 struct ErrorCount {
44 error_count: usize,
45 }
46
47 impl AeronErrorHandlerCallback for ErrorCount {
48 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
49 error!("Aeron error {}: {}", error_code, msg);
50 self.error_count += 1;
51 }
52 }
53
54 #[test]
55 #[serial]
56 fn version_check() -> Result<(), Box<dyn error::Error>> {
57 unsafe {
58 aeron_randomised_int32();
59 }
60 let alloc_count = current_allocs();
61
62 {
63 let major = unsafe { crate::aeron_version_major() };
64 let minor = unsafe { crate::aeron_version_minor() };
65 let patch = unsafe { crate::aeron_version_patch() };
66
67 let cargo_version = "1.48.6";
68 let aeron_version = format!("{}.{}.{}", major, minor, patch);
69 assert_eq!(aeron_version, cargo_version);
70
71 let ctx = AeronContext::new()?;
72 let error_count = 1;
73 let mut handler = Handler::leak(ErrorCount::default());
74 ctx.set_error_handler(Some(&handler))?;
75
76 assert!(Aeron::epoch_clock() > 0);
77 drop(ctx);
78 assert!(handler.should_drop);
79 handler.release();
80 assert!(!handler.should_drop);
81 drop(handler);
82 }
83
84 assert!(
85 current_allocs() <= alloc_count,
86 "allocations {} > {alloc_count}",
87 current_allocs()
88 );
89
90 Ok(())
91 }
92
93 #[test]
94 #[serial]
95 pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
96 let _ = env_logger::Builder::new()
97 .is_test(true)
98 .filter_level(log::LevelFilter::Info)
99 .try_init();
100 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
101 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
102 media_driver_ctx.set_dir_delete_on_start(true)?;
103 media_driver_ctx.set_dir(
104 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
105 )?;
106 let (stop, driver_handle) =
107 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
108
109 let ctx = AeronContext::new()?;
110 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
111 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
112 let error_count = 1;
113 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
114 ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
115 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
116 ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
117 ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
118 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
119 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
120 ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
121
122 info!("creating client [simple_large_send test]");
123 let aeron = Aeron::new(&ctx)?;
124 info!("starting client");
125
126 aeron.start()?;
127 info!("client started");
128 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
129 info!("created publisher");
130
131 assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
132 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
133 AeronCncMetadata::read_from_file(&cstr, |cnc| {
134 assert!(cnc.pid > 0);
135 })?;
136 assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
137 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
138 for _ in 0..50 {
139 AeronCnc::read_on_partial_stack(&cstr, |cnc| {
140 assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
141 })?;
142 }
143
144 let subscription = aeron.add_subscription(
145 AERON_IPC_STREAM,
146 123,
147 Handlers::no_available_image_handler(),
148 Handlers::no_unavailable_image_handler(),
149 Duration::from_secs(5),
150 )?;
151 info!("created subscription");
152
153 subscription
154 .poll_once(|msg, header| println!("foo"), 1024)
155 .unwrap();
156
157 let string_len = media_driver_ctx.ipc_mtu_length * 100;
159 info!("string length: {}", string_len);
160
161 let publisher_handler = {
162 let stop = stop.clone();
163 std::thread::spawn(move || {
164 let binding = "1".repeat(string_len);
165 let large_msg = binding.as_bytes();
166 loop {
167 if stop.load(Ordering::Acquire) || publisher.is_closed() {
168 break;
169 }
170 let result =
171 publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
172
173 assert_eq!(123, publisher.get_constants().unwrap().stream_id);
174
175 if result < large_msg.len() as i64 {
176 let error = AeronCError::from_code(result as i32);
177 match error.kind() {
178 AeronErrorType::PublicationBackPressured
179 | AeronErrorType::PublicationAdminAction => {
180 }
182 _ => {
183 error!(
184 "ERROR: failed to send message {:?}",
185 AeronCError::from_code(result as i32)
186 );
187 }
188 }
189 sleep(Duration::from_millis(500));
190 }
191 }
192 info!("stopping publisher thread");
193 })
194 };
195
196 let mut assembler = AeronFragmentClosureAssembler::new()?;
197
198 struct Context {
199 count: Arc<AtomicUsize>,
200 stop: Arc<AtomicBool>,
201 string_len: usize,
202 }
203
204 let count = Arc::new(AtomicUsize::new(0usize));
205 let mut context = Context {
206 count: count.clone(),
207 stop: stop.clone(),
208 string_len,
209 };
210
211 let start_time = Instant::now();
213
214 loop {
215 if start_time.elapsed() > Duration::from_secs(30) {
216 info!("Failed: exceeded 30-second timeout");
217 return Err(Box::new(std::io::Error::new(
218 std::io::ErrorKind::TimedOut,
219 "Timeout exceeded",
220 )));
221 }
222 let c = count.load(Ordering::SeqCst);
223 if c > 100 {
224 break;
225 }
226
227 fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
228 ctx.count.fetch_add(1, Ordering::SeqCst);
229
230 let values = header.get_values().unwrap();
231 assert_ne!(values.frame.session_id, 0);
232
233 if buffer.len() != ctx.string_len {
234 ctx.stop.store(true, Ordering::SeqCst);
235 error!(
236 "ERROR: message was {} but was expecting {} [header={:?}]",
237 buffer.len(),
238 ctx.string_len,
239 header
240 );
241 sleep(Duration::from_secs(1));
242 }
243
244 assert_eq!(buffer.len(), ctx.string_len);
245 assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
246 }
247
248 subscription.poll(assembler.process(&mut context, process_msg), 128)?;
249 assert_eq!(123, subscription.get_constants().unwrap().stream_id);
250 }
251
252 subscription.close(Handlers::no_notification_handler())?;
253
254 info!("stopping client");
255 stop.store(true, Ordering::SeqCst);
256
257 let _ = publisher_handler.join().unwrap();
258 let _ = driver_handle.join().unwrap();
259
260 let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
261 cnc.counters_reader().foreach_counter_once(
262 |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
263 println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
264 AeronSystemCounterType::try_from(type_id));
265 },
266 );
267 cnc.error_log_read_once(| observation_count: i32,
268 first_observation_timestamp: i64,
269 last_observation_timestamp: i64,
270 error: &str| {
271 println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
272 }, 0);
273 cnc.loss_reporter_read_once(| observation_count: i64,
274 total_bytes_lost: i64,
275 first_observation_timestamp: i64,
276 last_observation_timestamp: i64,
277 session_id: i32,
278 stream_id: i32,
279 channel: &str,
280 source: &str,| {
281 println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
282 })?;
283
284 Ok(())
285 }
286
287 #[test]
288 #[serial]
289 pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
290 let _ = env_logger::Builder::new()
291 .is_test(true)
292 .filter_level(log::LevelFilter::Info)
293 .try_init();
294 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
295 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
296 media_driver_ctx.set_dir_delete_on_start(true)?;
297 media_driver_ctx.set_dir(
298 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
299 )?;
300 let (stop, driver_handle) =
301 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
302
303 let ctx = AeronContext::new()?;
304 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
305 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
306 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
307
308 info!("creating client [try_claim test]");
309 let aeron = Aeron::new(&ctx)?;
310 info!("starting client");
311
312 aeron.start()?;
313 info!("client started");
314 const STREAM_ID: i32 = 123;
315 let publisher =
316 aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
317 info!("created publisher");
318
319 let subscription = aeron.add_subscription(
320 AERON_IPC_STREAM,
321 STREAM_ID,
322 Handlers::no_available_image_handler(),
323 Handlers::no_unavailable_image_handler(),
324 Duration::from_secs(5),
325 )?;
326 info!("created subscription");
327
328 let string_len = 156;
330 info!("string length: {}", string_len);
331
332 let publisher_handler = {
333 let stop = stop.clone();
334 std::thread::spawn(move || {
335 let binding = "1".repeat(string_len);
336 let msg = binding.as_bytes();
337 let buffer = AeronBufferClaim::default();
338 loop {
339 if stop.load(Ordering::Acquire) || publisher.is_closed() {
340 break;
341 }
342
343 let result = publisher.try_claim(string_len, &buffer);
344
345 if result < msg.len() as i64 {
346 error!(
347 "ERROR: failed to send message {:?}",
348 AeronCError::from_code(result as i32)
349 );
350 } else {
351 buffer.data().write_all(&msg).unwrap();
352 buffer.commit().unwrap();
353 }
354 }
355 info!("stopping publisher thread");
356 })
357 };
358
359 let count = Arc::new(AtomicUsize::new(0usize));
360 let count_copy = Arc::clone(&count);
361 let stop2 = stop.clone();
362
363 struct FragmentHandler {
364 count_copy: Arc<AtomicUsize>,
365 stop2: Arc<AtomicBool>,
366 string_len: usize,
367 }
368
369 impl AeronFragmentHandlerCallback for FragmentHandler {
370 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
371 assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
372 let header = header.get_values().unwrap();
373 let frame = header.frame();
374 let stream_id = frame.stream_id();
375 assert_eq!(STREAM_ID, stream_id);
376
377 self.count_copy.fetch_add(1, Ordering::SeqCst);
378
379 if buffer.len() != self.string_len {
380 self.stop2.store(true, Ordering::SeqCst);
381 error!(
382 "ERROR: message was {} but was expecting {} [header={:?}]",
383 buffer.len(),
384 self.string_len,
385 header
386 );
387 sleep(Duration::from_secs(1));
388 }
389
390 assert_eq!(buffer.len(), self.string_len);
391 assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
392 }
393 }
394
395 let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
396 count_copy,
397 stop2,
398 string_len,
399 })?;
400 let start_time = Instant::now();
401
402 loop {
403 if start_time.elapsed() > Duration::from_secs(30) {
404 info!("Failed: exceeded 30-second timeout");
405 return Err(Box::new(std::io::Error::new(
406 std::io::ErrorKind::TimedOut,
407 "Timeout exceeded",
408 )));
409 }
410 let c = count.load(Ordering::SeqCst);
411 if c > 100 {
412 break;
413 }
414 subscription.poll(Some(&closure), 128)?;
415 }
416
417 info!("stopping client");
418
419 stop.store(true, Ordering::SeqCst);
420
421 let _ = publisher_handler.join().unwrap();
422 let _ = driver_handle.join().unwrap();
423 Ok(())
424 }
425
426 #[test]
427 #[serial]
428 pub fn counters() -> Result<(), Box<dyn error::Error>> {
429 let _ = env_logger::Builder::new()
430 .is_test(true)
431 .filter_level(log::LevelFilter::Info)
432 .try_init();
433 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
434 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
435 media_driver_ctx.set_dir_delete_on_start(true)?;
436 media_driver_ctx.set_dir(
437 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
438 )?;
439 let (stop, driver_handle) =
440 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
441
442 let ctx = AeronContext::new()?;
443 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
444 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
445 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
446 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
447
448 struct AvailableCounterHandler {
449 found_counter: bool,
450 }
451
452 impl AeronAvailableCounterCallback for AvailableCounterHandler {
453 fn handle_aeron_on_available_counter(
454 &mut self,
455 counters_reader: AeronCountersReader,
456 registration_id: i64,
457 counter_id: i32,
458 ) -> () {
459 info!(
460 "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
461 String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
462 counters_reader.get_counter_label(counter_id, 1000),
463 counters_reader.addr(counter_id)
464 );
465
466 assert_eq!(
467 counters_reader.counter_registration_id(counter_id).unwrap(),
468 registration_id
469 );
470
471 if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
472 if label == "label_buffer" {
473 self.found_counter = true;
474 assert_eq!(
475 &counters_reader.get_counter_key(counter_id).unwrap(),
476 "key".as_bytes()
477 );
478 }
479 }
480 }
481 }
482
483 let handler = &Handler::leak(AvailableCounterHandler {
484 found_counter: false,
485 });
486 ctx.set_on_available_counter(Some(handler))?;
487
488 info!("creating client");
489 let aeron = Aeron::new(&ctx)?;
490 info!("starting client");
491
492 aeron.start()?;
493 info!("client started [counters test]");
494
495 let counter = aeron.add_counter(
496 123,
497 "key".as_bytes(),
498 "label_buffer",
499 Duration::from_secs(5),
500 )?;
501 let constants = counter.get_constants()?;
502 let counter_id = constants.counter_id;
503
504 let publisher_handler = {
505 let stop = stop.clone();
506 let counter = counter.clone();
507 std::thread::spawn(move || {
508 for _ in 0..150 {
509 if stop.load(Ordering::Acquire) || counter.is_closed() {
510 break;
511 }
512 counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
513 }
514 info!("stopping publisher thread");
515 })
516 };
517
518 let now = Instant::now();
519 while counter.addr_atomic().load(Ordering::SeqCst) < 100
520 && now.elapsed() < Duration::from_secs(10)
521 {
522 sleep(Duration::from_micros(10));
523 }
524
525 assert!(now.elapsed() < Duration::from_secs(10));
526
527 info!(
528 "counter is {}",
529 counter.addr_atomic().load(Ordering::SeqCst)
530 );
531
532 info!("stopping client");
533
534 #[cfg(not(target_os = "windows"))] assert!(handler.found_counter);
536
537 stop.store(true, Ordering::SeqCst);
538
539 let reader = aeron.counters_reader();
540 assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
541 assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
542 let buffers = AeronCountersReaderBuffers::default();
543 reader.get_buffers(&buffers)?;
544
545 let _ = publisher_handler.join().unwrap();
546 let _ = driver_handle.join().unwrap();
547 Ok(())
548 }
549
550 #[derive(Default, Debug)]
552 struct TestErrorCount {
553 pub error_count: usize,
554 }
555
556 impl Drop for TestErrorCount {
557 fn drop(&mut self) {
558 info!("TestErrorCount dropped with {} errors", self.error_count);
559 }
560 }
561
562 impl AeronErrorHandlerCallback for TestErrorCount {
563 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
564 error!("Aeron error {}: {}", error_code, msg);
565 self.error_count += 1;
566 }
567 }
568
569 #[test]
570 #[serial]
571 pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
572 let _ = env_logger::Builder::new()
573 .is_test(true)
574 .filter_level(log::LevelFilter::Info)
575 .try_init();
576
577 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
578 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
579 media_driver_ctx.set_dir_delete_on_start(true)?;
580 media_driver_ctx.set_dir(
581 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
582 )?;
583 let (stop, driver_handle) =
584 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
585
586 let ctx = AeronContext::new()?;
587 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
588 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
589
590 let aeron = Aeron::new(&ctx)?;
591 aeron.start()?;
592
593 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
594 let subscription = aeron.add_subscription(
595 AERON_IPC_STREAM,
596 123,
597 Handlers::no_available_image_handler(),
598 Handlers::no_unavailable_image_handler(),
599 Duration::from_secs(5),
600 )?;
601
602 let count = Arc::new(AtomicUsize::new(0));
603 let start_time = Instant::now();
604
605 let publisher_thread = {
607 let stop = stop.clone();
608 std::thread::spawn(move || {
609 while !stop.load(Ordering::Acquire) {
610 let msg = b"test";
611 let result =
612 publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
613 if result == AeronErrorType::PublicationBackPressured.code() as i64 {
615 sleep(Duration::from_millis(50));
616 }
617 }
618 })
619 };
620
621 while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
623 let _ = subscription.poll_once(
624 |_msg, _header| {
625 count.fetch_add(1, Ordering::SeqCst);
626 },
627 128,
628 )?;
629 }
630
631 stop.store(true, Ordering::SeqCst);
632 publisher_thread.join().unwrap();
633 let _ = driver_handle.join().unwrap();
634
635 assert!(
636 count.load(Ordering::SeqCst) >= 50,
637 "Expected at least 50 messages received"
638 );
639 Ok(())
640 }
641
642 #[test]
643 #[serial]
644 pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
645 let _ = env_logger::Builder::new()
646 .is_test(true)
647 .filter_level(log::LevelFilter::Info)
648 .try_init();
649
650 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
651 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
652 media_driver_ctx.set_dir_delete_on_start(true)?;
653 media_driver_ctx.set_dir(
654 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
655 )?;
656 let (_stop, driver_handle) =
657 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
658
659 let ctx = AeronContext::new()?;
660 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
661 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
662
663 let aeron = Aeron::new(&ctx)?;
664 aeron.start()?;
665 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
666
667 let subscription1 = aeron.add_subscription(
669 AERON_IPC_STREAM,
670 123,
671 Handlers::no_available_image_handler(),
672 Handlers::no_unavailable_image_handler(),
673 Duration::from_secs(5),
674 )?;
675 let subscription2 = aeron.add_subscription(
676 AERON_IPC_STREAM,
677 123,
678 Handlers::no_available_image_handler(),
679 Handlers::no_unavailable_image_handler(),
680 Duration::from_secs(5),
681 )?;
682
683 let count1 = Arc::new(AtomicUsize::new(0));
684 let count2 = Arc::new(AtomicUsize::new(0));
685
686 let msg = b"hello multi-subscription";
688 let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
689 assert!(
690 result >= msg.len() as i64,
691 "Message should be sent successfully"
692 );
693
694 let start_time = Instant::now();
695 while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
697 && start_time.elapsed() < Duration::from_secs(5)
698 {
699 let _ = subscription1.poll_once(
700 |_msg, _header| {
701 count1.fetch_add(1, Ordering::SeqCst);
702 },
703 128,
704 )?;
705 let _ = subscription2.poll_once(
706 |_msg, _header| {
707 count2.fetch_add(1, Ordering::SeqCst);
708 },
709 128,
710 )?;
711 }
712
713 assert!(
714 count1.load(Ordering::SeqCst) >= 1,
715 "Subscription 1 did not receive the message"
716 );
717 assert!(
718 count2.load(Ordering::SeqCst) >= 1,
719 "Subscription 2 did not receive the message"
720 );
721
722 _stop.store(true, Ordering::SeqCst);
723 let _ = driver_handle.join().unwrap();
724 Ok(())
725 }
726
727 #[test]
728 #[serial]
729 pub fn should_be_able_to_drop_after_close_manually_being_closed(
730 ) -> Result<(), Box<dyn error::Error>> {
731 let _ = env_logger::Builder::new()
732 .is_test(true)
733 .filter_level(log::LevelFilter::Info)
734 .try_init();
735
736 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
737 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
738 media_driver_ctx.set_dir_delete_on_start(true)?;
739 media_driver_ctx.set_dir(
740 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
741 )?;
742 let (_stop, driver_handle) =
743 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
744
745 let ctx = AeronContext::new()?;
746 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
747 ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
748
749 let aeron = Aeron::new(&ctx)?;
750 aeron.start()?;
751
752 {
753 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
754 info!("created publication [sessionId={}]", publisher.session_id());
755 publisher.close_with_no_args()?;
756 drop(publisher);
757 }
758
759 {
760 let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
761 info!("created publication [sessionId={}]", publisher.session_id());
762 publisher.close(Handlers::no_notification_handler())?;
763 drop(publisher);
764 }
765
766 {
767 let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
768 publisher.close_once(|| println!("on close"))?;
769 info!("created publication [sessionId={}]", publisher.session_id());
770 drop(publisher);
771 }
772
773 Ok(())
774 }
775
776 #[test]
777 #[serial]
778 pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
779 let _ = env_logger::Builder::new()
780 .is_test(true)
781 .filter_level(log::LevelFilter::Info)
782 .try_init();
783
784 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
785 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
786 media_driver_ctx.set_dir_delete_on_start(true)?;
787 media_driver_ctx.set_dir(
788 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
789 )?;
790 let (_stop, driver_handle) =
791 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
792
793 let ctx = AeronContext::new()?;
794 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
795 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
796
797 let aeron = Aeron::new(&ctx)?;
798 aeron.start()?;
799 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
800
801 publisher.close(Handlers::no_notification_handler())?;
803
804 let result = publisher.offer(
806 b"should fail",
807 Handlers::no_reserved_value_supplier_handler(),
808 );
809 assert!(
810 result < 0,
811 "Offering on a closed publication should return a negative error code"
812 );
813
814 _stop.store(true, Ordering::SeqCst);
815 let _ = driver_handle.join().unwrap();
816 Ok(())
817 }
818
819 #[test]
821 #[serial]
822 pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
823 let _ = env_logger::Builder::new()
824 .is_test(true)
825 .filter_level(log::LevelFilter::Info)
826 .try_init();
827
828 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
829 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
830 media_driver_ctx.set_dir_delete_on_start(true)?;
831 media_driver_ctx.set_dir(
832 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
833 )?;
834 let (_stop, driver_handle) =
835 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
836
837 let ctx = AeronContext::new()?;
838 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
839 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
840
841 let aeron = Aeron::new(&ctx)?;
842 aeron.start()?;
843 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
844 let subscription = aeron.add_subscription(
845 AERON_IPC_STREAM,
846 123,
847 Handlers::no_available_image_handler(),
848 Handlers::no_unavailable_image_handler(),
849 Duration::from_secs(5),
850 )?;
851
852 let empty_received = Arc::new(AtomicBool::new(false));
853 let start_time = Instant::now();
854
855 let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
856 assert!(result > 0);
857
858 while !empty_received.load(Ordering::SeqCst)
859 && start_time.elapsed() < Duration::from_secs(5)
860 {
861 let _ = subscription.poll_once(
862 |msg, _header| {
863 if msg.is_empty() {
864 empty_received.store(true, Ordering::SeqCst);
865 }
866 },
867 128,
868 )?;
869 }
870
871 assert!(
872 empty_received.load(Ordering::SeqCst),
873 "Empty message was not received"
874 );
875 _stop.store(true, Ordering::SeqCst);
876 let _ = driver_handle.join().unwrap();
877 Ok(())
878 }
879
880 #[test]
881 #[serial]
882 #[ignore] pub fn tags() -> Result<(), Box<dyn error::Error>> {
884 let _ = env_logger::Builder::new()
885 .is_test(true)
886 .filter_level(log::LevelFilter::Debug)
887 .try_init();
888
889 let (md_ctx, stop, md) = start_media_driver(1)?;
890
891 let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
892
893 info!("creating suscriber 1");
894 let sub = aeron_sub
895 .add_subscription(
896 &"aeron:udp?tags=100".into_c_string(),
897 123,
898 Handlers::no_available_image_handler(),
899 Handlers::no_unavailable_image_handler(),
900 Duration::from_secs(50),
901 )
902 .map_err(|e| {
903 error!("aeron error={}", Aeron::errmsg());
904 e
905 })?;
906
907 let ctx = AeronContext::new()?;
908 ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
909 let aeron = Aeron::new(&ctx)?;
910 aeron.start()?;
911
912 info!("creating suscriber 2");
913 let sub2 = aeron_sub.add_subscription(
914 &"aeron:udp?tags=100".into_c_string(),
915 123,
916 Handlers::no_available_image_handler(),
917 Handlers::no_unavailable_image_handler(),
918 Duration::from_secs(50),
919 )?;
920
921 let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
922 info!("creating publisher");
923 assert!(!aeron_pub.is_closed());
924 let publisher = aeron_pub
925 .add_publication(
926 &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
927 123,
928 Duration::from_secs(5),
929 )
930 .map_err(|e| {
931 error!("aeron error={}", Aeron::errmsg());
932 e
933 })?;
934
935 info!("publishing msg");
936
937 loop {
938 let result = publisher.offer(
939 "213".as_bytes(),
940 Handlers::no_reserved_value_supplier_handler(),
941 );
942 if result < 0 {
943 error!(
944 "failed to publish {:?}",
945 AeronCError::from_code(result as i32)
946 );
947 } else {
948 break;
949 }
950 }
951
952 sub.poll_once(
953 |msg, _header| {
954 println!("Received message: {:?}", msg);
955 },
956 128,
957 )?;
958 sub2.poll_once(
959 |msg, _header| {
960 println!("Received message: {:?}", msg);
961 },
962 128,
963 )?;
964
965 stop.store(true, Ordering::SeqCst);
966
967 Ok(())
968 }
969
970 fn create_client(
971 media_driver_ctx: &AeronDriverContext,
972 ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
973 let dir = media_driver_ctx.get_dir();
974 info!("creating aeron client [dir={}]", dir);
975 let ctx = AeronContext::new()?;
976 ctx.set_dir(&dir.into_c_string())?;
977 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
978 let aeron = Aeron::new(&ctx)?;
979 aeron.start()?;
980 Ok((ctx, aeron))
981 }
982
983 fn start_media_driver(
984 instance: u64,
985 ) -> Result<
986 (
987 AeronDriverContext,
988 Arc<AtomicBool>,
989 JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
990 ),
991 Box<dyn Error>,
992 > {
993 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
994 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
995 media_driver_ctx.set_dir_delete_on_start(true)?;
996 media_driver_ctx.set_dir(
997 &format!(
998 "{}{}-{}",
999 media_driver_ctx.get_dir(),
1000 Aeron::epoch_clock(),
1001 instance
1002 )
1003 .into_c_string(),
1004 )?;
1005 let (stop, driver_handle) =
1006 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1007 Ok((media_driver_ctx, stop, driver_handle))
1008 }
1009
1010 #[doc = include_str!("../../README.md")]
1011 mod readme_tests {}
1012}