1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8pub mod bindings {
17 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19
20use bindings::*;
21
22include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
23include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
24
25#[cfg(test)]
26mod tests {
27 use super::*;
28 use crate::test_alloc::current_allocs;
29 use log::{error, info};
30 use rusteron_media_driver::AeronDriverContext;
31 use serial_test::serial;
32 use std::error;
33 use std::error::Error;
34 use std::io::Write;
35 use std::os::raw::c_int;
36 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
37 use std::sync::Arc;
38 use std::thread::{sleep, JoinHandle};
39 use std::time::{Duration, Instant};
40
41 #[derive(Default, Debug)]
42 struct ErrorCount {
43 error_count: usize,
44 }
45
46 impl AeronErrorHandlerCallback for ErrorCount {
47 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
48 error!("Aeron error {}: {}", error_code, msg);
49 self.error_count += 1;
50 }
51 }
52
53 #[test]
54 #[serial]
55 fn version_check() -> Result<(), Box<dyn error::Error>> {
56 unsafe {
57 aeron_randomised_int32();
58 }
59 let alloc_count = current_allocs();
60
61 {
62 let major = unsafe { crate::aeron_version_major() };
63 let minor = unsafe { crate::aeron_version_minor() };
64 let patch = unsafe { crate::aeron_version_patch() };
65
66 let cargo_version = "1.48.5";
67 let aeron_version = format!("{}.{}.{}", major, minor, patch);
68 assert_eq!(aeron_version, cargo_version);
69
70 let ctx = AeronContext::new()?;
71 let error_count = 1;
72 let mut handler = Handler::leak(ErrorCount::default());
73 ctx.set_error_handler(Some(&handler))?;
74
75 assert!(Aeron::epoch_clock() > 0);
76 drop(ctx);
77 assert!(handler.should_drop);
78 handler.release();
79 assert!(!handler.should_drop);
80 drop(handler);
81 }
82
83 assert!(
84 current_allocs() <= alloc_count,
85 "allocations {} > {alloc_count}",
86 current_allocs()
87 );
88
89 Ok(())
90 }
91
92 #[test]
93 #[serial]
94 pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
95 let _ = env_logger::Builder::new()
96 .is_test(true)
97 .filter_level(log::LevelFilter::Info)
98 .try_init();
99 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
100 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
101 media_driver_ctx.set_dir_delete_on_start(true)?;
102 media_driver_ctx.set_dir(
103 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
104 )?;
105 let (stop, driver_handle) =
106 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
107
108 let ctx = AeronContext::new()?;
109 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
110 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
111 let error_count = 1;
112 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
113 ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
114 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
115 ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
116 ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
117 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
118 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
119 ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
120
121 info!("creating client [simple_large_send test]");
122 let aeron = Aeron::new(&ctx)?;
123 info!("starting client");
124
125 aeron.start()?;
126 info!("client started");
127 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
128 info!("created publisher");
129
130 assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
131 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
132 AeronCncMetadata::read_from_file(&cstr, |cnc| {
133 assert!(cnc.pid > 0);
134 })?;
135 assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
136 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
137 for _ in 0..50 {
138 AeronCnc::read_on_partial_stack(&cstr, |cnc| {
139 assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
140 })?;
141 }
142
143 let subscription = aeron.add_subscription(
144 AERON_IPC_STREAM,
145 123,
146 Handlers::no_available_image_handler(),
147 Handlers::no_unavailable_image_handler(),
148 Duration::from_secs(5),
149 )?;
150 info!("created subscription");
151
152 subscription
153 .poll_once(|msg, header| println!("foo"), 1024)
154 .unwrap();
155
156 let string_len = media_driver_ctx.ipc_mtu_length * 100;
158 info!("string length: {}", string_len);
159
160 let publisher_handler = {
161 let stop = stop.clone();
162 std::thread::spawn(move || {
163 let binding = "1".repeat(string_len);
164 let large_msg = binding.as_bytes();
165 loop {
166 if stop.load(Ordering::Acquire) || publisher.is_closed() {
167 break;
168 }
169 let result =
170 publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
171
172 assert_eq!(123, publisher.get_constants().unwrap().stream_id);
173
174 if result < large_msg.len() as i64 {
175 let error = AeronCError::from_code(result as i32);
176 match error.kind() {
177 AeronErrorType::PublicationBackPressured
178 | AeronErrorType::PublicationAdminAction => {
179 }
181 _ => {
182 error!(
183 "ERROR: failed to send message {:?}",
184 AeronCError::from_code(result as i32)
185 );
186 }
187 }
188 sleep(Duration::from_millis(500));
189 }
190 }
191 info!("stopping publisher thread");
192 })
193 };
194
195 let mut assembler = AeronFragmentClosureAssembler::new()?;
196
197 struct Context {
198 count: Arc<AtomicUsize>,
199 stop: Arc<AtomicBool>,
200 string_len: usize,
201 }
202
203 let count = Arc::new(AtomicUsize::new(0usize));
204 let mut context = Context {
205 count: count.clone(),
206 stop: stop.clone(),
207 string_len,
208 };
209
210 let start_time = Instant::now();
212
213 loop {
214 if start_time.elapsed() > Duration::from_secs(30) {
215 info!("Failed: exceeded 30-second timeout");
216 return Err(Box::new(std::io::Error::new(
217 std::io::ErrorKind::TimedOut,
218 "Timeout exceeded",
219 )));
220 }
221 let c = count.load(Ordering::SeqCst);
222 if c > 100 {
223 break;
224 }
225
226 fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
227 ctx.count.fetch_add(1, Ordering::SeqCst);
228
229 let values = header.get_values().unwrap();
230 assert_ne!(values.frame.session_id, 0);
231
232 if buffer.len() != ctx.string_len {
233 ctx.stop.store(true, Ordering::SeqCst);
234 error!(
235 "ERROR: message was {} but was expecting {} [header={:?}]",
236 buffer.len(),
237 ctx.string_len,
238 header
239 );
240 sleep(Duration::from_secs(1));
241 }
242
243 assert_eq!(buffer.len(), ctx.string_len);
244 assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
245 }
246
247 subscription.poll(assembler.process(&mut context, process_msg), 128)?;
248 assert_eq!(123, subscription.get_constants().unwrap().stream_id);
249 }
250
251 subscription.close(Handlers::no_notification_handler())?;
252
253 info!("stopping client");
254 stop.store(true, Ordering::SeqCst);
255
256 let _ = publisher_handler.join().unwrap();
257 let _ = driver_handle.join().unwrap();
258
259 let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
260 cnc.counters_reader().foreach_counter_once(
261 |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
262 println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
263 AeronSystemCounterType::try_from(type_id));
264 },
265 );
266 cnc.error_log_read_once(| observation_count: i32,
267 first_observation_timestamp: i64,
268 last_observation_timestamp: i64,
269 error: &str| {
270 println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
271 }, 0);
272 cnc.loss_reporter_read_once(| observation_count: i64,
273 total_bytes_lost: i64,
274 first_observation_timestamp: i64,
275 last_observation_timestamp: i64,
276 session_id: i32,
277 stream_id: i32,
278 channel: &str,
279 source: &str,| {
280 println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
281 })?;
282
283 Ok(())
284 }
285
286 #[test]
287 #[serial]
288 pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
289 let _ = env_logger::Builder::new()
290 .is_test(true)
291 .filter_level(log::LevelFilter::Info)
292 .try_init();
293 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
294 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
295 media_driver_ctx.set_dir_delete_on_start(true)?;
296 media_driver_ctx.set_dir(
297 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
298 )?;
299 let (stop, driver_handle) =
300 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
301
302 let ctx = AeronContext::new()?;
303 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
304 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
305 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
306
307 info!("creating client [try_claim test]");
308 let aeron = Aeron::new(&ctx)?;
309 info!("starting client");
310
311 aeron.start()?;
312 info!("client started");
313 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
314 info!("created publisher");
315
316 let subscription = aeron.add_subscription(
317 AERON_IPC_STREAM,
318 123,
319 Handlers::no_available_image_handler(),
320 Handlers::no_unavailable_image_handler(),
321 Duration::from_secs(5),
322 )?;
323 info!("created subscription");
324
325 let string_len = 156;
327 info!("string length: {}", string_len);
328
329 let publisher_handler = {
330 let stop = stop.clone();
331 std::thread::spawn(move || {
332 let binding = "1".repeat(string_len);
333 let msg = binding.as_bytes();
334 let buffer = AeronBufferClaim::default();
335 loop {
336 if stop.load(Ordering::Acquire) || publisher.is_closed() {
337 break;
338 }
339
340 let result = publisher.try_claim(string_len, &buffer);
341
342 if result < msg.len() as i64 {
343 error!(
344 "ERROR: failed to send message {:?}",
345 AeronCError::from_code(result as i32)
346 );
347 } else {
348 buffer.data().write_all(&msg).unwrap();
349 buffer.commit().unwrap();
350 }
351 }
352 info!("stopping publisher thread");
353 })
354 };
355
356 let count = Arc::new(AtomicUsize::new(0usize));
357 let count_copy = Arc::clone(&count);
358 let stop2 = stop.clone();
359
360 struct FragmentHandler {
361 count_copy: Arc<AtomicUsize>,
362 stop2: Arc<AtomicBool>,
363 string_len: usize,
364 }
365
366 impl AeronFragmentHandlerCallback for FragmentHandler {
367 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
368 self.count_copy.fetch_add(1, Ordering::SeqCst);
369
370 if buffer.len() != self.string_len {
371 self.stop2.store(true, Ordering::SeqCst);
372 error!(
373 "ERROR: message was {} but was expecting {} [header={:?}]",
374 buffer.len(),
375 self.string_len,
376 header
377 );
378 sleep(Duration::from_secs(1));
379 }
380
381 assert_eq!(buffer.len(), self.string_len);
382 assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
383 }
384 }
385
386 let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
387 count_copy,
388 stop2,
389 string_len,
390 })?;
391 let start_time = Instant::now();
392
393 loop {
394 if start_time.elapsed() > Duration::from_secs(30) {
395 info!("Failed: exceeded 30-second timeout");
396 return Err(Box::new(std::io::Error::new(
397 std::io::ErrorKind::TimedOut,
398 "Timeout exceeded",
399 )));
400 }
401 let c = count.load(Ordering::SeqCst);
402 if c > 100 {
403 break;
404 }
405 subscription.poll(Some(&closure), 128)?;
406 }
407
408 info!("stopping client");
409
410 stop.store(true, Ordering::SeqCst);
411
412 let _ = publisher_handler.join().unwrap();
413 let _ = driver_handle.join().unwrap();
414 Ok(())
415 }
416
417 #[test]
418 #[serial]
419 pub fn counters() -> Result<(), Box<dyn error::Error>> {
420 let _ = env_logger::Builder::new()
421 .is_test(true)
422 .filter_level(log::LevelFilter::Info)
423 .try_init();
424 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
425 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
426 media_driver_ctx.set_dir_delete_on_start(true)?;
427 media_driver_ctx.set_dir(
428 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
429 )?;
430 let (stop, driver_handle) =
431 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
432
433 let ctx = AeronContext::new()?;
434 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
435 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
436 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
437 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
438
439 struct AvailableCounterHandler {
440 found_counter: bool,
441 }
442
443 impl AeronAvailableCounterCallback for AvailableCounterHandler {
444 fn handle_aeron_on_available_counter(
445 &mut self,
446 counters_reader: AeronCountersReader,
447 registration_id: i64,
448 counter_id: i32,
449 ) -> () {
450 info!(
451 "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
452 String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
453 counters_reader.get_counter_label(counter_id, 1000),
454 counters_reader.addr(counter_id)
455 );
456
457 assert_eq!(
458 counters_reader.counter_registration_id(counter_id).unwrap(),
459 registration_id
460 );
461
462 if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
463 if label == "label_buffer" {
464 self.found_counter = true;
465 assert_eq!(
466 &counters_reader.get_counter_key(counter_id).unwrap(),
467 "key".as_bytes()
468 );
469 }
470 }
471 }
472 }
473
474 let handler = &Handler::leak(AvailableCounterHandler {
475 found_counter: false,
476 });
477 ctx.set_on_available_counter(Some(handler))?;
478
479 info!("creating client");
480 let aeron = Aeron::new(&ctx)?;
481 info!("starting client");
482
483 aeron.start()?;
484 info!("client started [counters test]");
485
486 let counter = aeron.add_counter(
487 123,
488 "key".as_bytes(),
489 "label_buffer",
490 Duration::from_secs(5),
491 )?;
492 let constants = counter.get_constants()?;
493 let counter_id = constants.counter_id;
494
495 let publisher_handler = {
496 let stop = stop.clone();
497 let counter = counter.clone();
498 std::thread::spawn(move || {
499 for _ in 0..150 {
500 if stop.load(Ordering::Acquire) || counter.is_closed() {
501 break;
502 }
503 counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
504 }
505 info!("stopping publisher thread");
506 })
507 };
508
509 let now = Instant::now();
510 while counter.addr_atomic().load(Ordering::SeqCst) < 100
511 && now.elapsed() < Duration::from_secs(10)
512 {
513 sleep(Duration::from_micros(10));
514 }
515
516 assert!(now.elapsed() < Duration::from_secs(10));
517
518 info!(
519 "counter is {}",
520 counter.addr_atomic().load(Ordering::SeqCst)
521 );
522
523 info!("stopping client");
524
525 #[cfg(not(target_os = "windows"))] assert!(handler.found_counter);
527
528 stop.store(true, Ordering::SeqCst);
529
530 let reader = aeron.counters_reader();
531 assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
532 assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
533 let buffers = AeronCountersReaderBuffers::default();
534 reader.get_buffers(&buffers)?;
535
536 let _ = publisher_handler.join().unwrap();
537 let _ = driver_handle.join().unwrap();
538 Ok(())
539 }
540
541 #[derive(Default, Debug)]
543 struct TestErrorCount {
544 pub error_count: usize,
545 }
546
547 impl Drop for TestErrorCount {
548 fn drop(&mut self) {
549 info!("TestErrorCount dropped with {} errors", self.error_count);
550 }
551 }
552
553 impl AeronErrorHandlerCallback for TestErrorCount {
554 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
555 error!("Aeron error {}: {}", error_code, msg);
556 self.error_count += 1;
557 }
558 }
559
560 #[test]
561 #[serial]
562 pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
563 let _ = env_logger::Builder::new()
564 .is_test(true)
565 .filter_level(log::LevelFilter::Info)
566 .try_init();
567
568 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
569 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
570 media_driver_ctx.set_dir_delete_on_start(true)?;
571 media_driver_ctx.set_dir(
572 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
573 )?;
574 let (stop, driver_handle) =
575 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
576
577 let ctx = AeronContext::new()?;
578 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
579 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
580
581 let aeron = Aeron::new(&ctx)?;
582 aeron.start()?;
583
584 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
585 let subscription = aeron.add_subscription(
586 AERON_IPC_STREAM,
587 123,
588 Handlers::no_available_image_handler(),
589 Handlers::no_unavailable_image_handler(),
590 Duration::from_secs(5),
591 )?;
592
593 let count = Arc::new(AtomicUsize::new(0));
594 let start_time = Instant::now();
595
596 let publisher_thread = {
598 let stop = stop.clone();
599 std::thread::spawn(move || {
600 while !stop.load(Ordering::Acquire) {
601 let msg = b"test";
602 let result =
603 publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
604 if result == AeronErrorType::PublicationBackPressured.code() as i64 {
606 sleep(Duration::from_millis(50));
607 }
608 }
609 })
610 };
611
612 while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
614 let _ = subscription.poll_once(
615 |_msg, _header| {
616 count.fetch_add(1, Ordering::SeqCst);
617 },
618 128,
619 )?;
620 }
621
622 stop.store(true, Ordering::SeqCst);
623 publisher_thread.join().unwrap();
624 let _ = driver_handle.join().unwrap();
625
626 assert!(
627 count.load(Ordering::SeqCst) >= 50,
628 "Expected at least 50 messages received"
629 );
630 Ok(())
631 }
632
633 #[test]
634 #[serial]
635 pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
636 let _ = env_logger::Builder::new()
637 .is_test(true)
638 .filter_level(log::LevelFilter::Info)
639 .try_init();
640
641 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
642 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
643 media_driver_ctx.set_dir_delete_on_start(true)?;
644 media_driver_ctx.set_dir(
645 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
646 )?;
647 let (_stop, driver_handle) =
648 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
649
650 let ctx = AeronContext::new()?;
651 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
652 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
653
654 let aeron = Aeron::new(&ctx)?;
655 aeron.start()?;
656 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
657
658 let subscription1 = aeron.add_subscription(
660 AERON_IPC_STREAM,
661 123,
662 Handlers::no_available_image_handler(),
663 Handlers::no_unavailable_image_handler(),
664 Duration::from_secs(5),
665 )?;
666 let subscription2 = aeron.add_subscription(
667 AERON_IPC_STREAM,
668 123,
669 Handlers::no_available_image_handler(),
670 Handlers::no_unavailable_image_handler(),
671 Duration::from_secs(5),
672 )?;
673
674 let count1 = Arc::new(AtomicUsize::new(0));
675 let count2 = Arc::new(AtomicUsize::new(0));
676
677 let msg = b"hello multi-subscription";
679 let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
680 assert!(
681 result >= msg.len() as i64,
682 "Message should be sent successfully"
683 );
684
685 let start_time = Instant::now();
686 while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
688 && start_time.elapsed() < Duration::from_secs(5)
689 {
690 let _ = subscription1.poll_once(
691 |_msg, _header| {
692 count1.fetch_add(1, Ordering::SeqCst);
693 },
694 128,
695 )?;
696 let _ = subscription2.poll_once(
697 |_msg, _header| {
698 count2.fetch_add(1, Ordering::SeqCst);
699 },
700 128,
701 )?;
702 }
703
704 assert!(
705 count1.load(Ordering::SeqCst) >= 1,
706 "Subscription 1 did not receive the message"
707 );
708 assert!(
709 count2.load(Ordering::SeqCst) >= 1,
710 "Subscription 2 did not receive the message"
711 );
712
713 _stop.store(true, Ordering::SeqCst);
714 let _ = driver_handle.join().unwrap();
715 Ok(())
716 }
717
718 #[test]
719 #[serial]
720 pub fn should_be_able_to_drop_after_close_manually_being_closed(
721 ) -> Result<(), Box<dyn error::Error>> {
722 let _ = env_logger::Builder::new()
723 .is_test(true)
724 .filter_level(log::LevelFilter::Info)
725 .try_init();
726
727 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
728 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
729 media_driver_ctx.set_dir_delete_on_start(true)?;
730 media_driver_ctx.set_dir(
731 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
732 )?;
733 let (_stop, driver_handle) =
734 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
735
736 let ctx = AeronContext::new()?;
737 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
738 ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
739
740 let aeron = Aeron::new(&ctx)?;
741 aeron.start()?;
742
743 {
744 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
745 info!("created publication [sessionId={}]", publisher.session_id());
746 publisher.close_with_no_args()?;
747 drop(publisher);
748 }
749
750 {
751 let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
752 info!("created publication [sessionId={}]", publisher.session_id());
753 publisher.close(Handlers::no_notification_handler())?;
754 drop(publisher);
755 }
756
757 {
758 let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
759 publisher.close_once(|| println!("on close"))?;
760 info!("created publication [sessionId={}]", publisher.session_id());
761 drop(publisher);
762 }
763
764 Ok(())
765 }
766
767 #[test]
768 #[serial]
769 pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
770 let _ = env_logger::Builder::new()
771 .is_test(true)
772 .filter_level(log::LevelFilter::Info)
773 .try_init();
774
775 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
776 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
777 media_driver_ctx.set_dir_delete_on_start(true)?;
778 media_driver_ctx.set_dir(
779 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
780 )?;
781 let (_stop, driver_handle) =
782 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
783
784 let ctx = AeronContext::new()?;
785 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
786 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
787
788 let aeron = Aeron::new(&ctx)?;
789 aeron.start()?;
790 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
791
792 publisher.close(Handlers::no_notification_handler())?;
794
795 let result = publisher.offer(
797 b"should fail",
798 Handlers::no_reserved_value_supplier_handler(),
799 );
800 assert!(
801 result < 0,
802 "Offering on a closed publication should return a negative error code"
803 );
804
805 _stop.store(true, Ordering::SeqCst);
806 let _ = driver_handle.join().unwrap();
807 Ok(())
808 }
809
810 #[test]
812 #[serial]
813 pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
814 let _ = env_logger::Builder::new()
815 .is_test(true)
816 .filter_level(log::LevelFilter::Info)
817 .try_init();
818
819 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
820 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
821 media_driver_ctx.set_dir_delete_on_start(true)?;
822 media_driver_ctx.set_dir(
823 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
824 )?;
825 let (_stop, driver_handle) =
826 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
827
828 let ctx = AeronContext::new()?;
829 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
830 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
831
832 let aeron = Aeron::new(&ctx)?;
833 aeron.start()?;
834 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
835 let subscription = aeron.add_subscription(
836 AERON_IPC_STREAM,
837 123,
838 Handlers::no_available_image_handler(),
839 Handlers::no_unavailable_image_handler(),
840 Duration::from_secs(5),
841 )?;
842
843 let empty_received = Arc::new(AtomicBool::new(false));
844 let start_time = Instant::now();
845
846 let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
847 assert!(result > 0);
848
849 while !empty_received.load(Ordering::SeqCst)
850 && start_time.elapsed() < Duration::from_secs(5)
851 {
852 let _ = subscription.poll_once(
853 |msg, _header| {
854 if msg.is_empty() {
855 empty_received.store(true, Ordering::SeqCst);
856 }
857 },
858 128,
859 )?;
860 }
861
862 assert!(
863 empty_received.load(Ordering::SeqCst),
864 "Empty message was not received"
865 );
866 _stop.store(true, Ordering::SeqCst);
867 let _ = driver_handle.join().unwrap();
868 Ok(())
869 }
870
871 #[test]
872 #[serial]
873 #[ignore] pub fn tags() -> Result<(), Box<dyn error::Error>> {
875 let _ = env_logger::Builder::new()
876 .is_test(true)
877 .filter_level(log::LevelFilter::Debug)
878 .try_init();
879
880 let (md_ctx, stop, md) = start_media_driver(1)?;
881
882 let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
883
884 info!("creating suscriber 1");
885 let sub = aeron_sub
886 .add_subscription(
887 &"aeron:udp?tags=100".into_c_string(),
888 123,
889 Handlers::no_available_image_handler(),
890 Handlers::no_unavailable_image_handler(),
891 Duration::from_secs(50),
892 )
893 .map_err(|e| {
894 error!("aeron error={}", Aeron::errmsg());
895 e
896 })?;
897
898 let ctx = AeronContext::new()?;
899 ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
900 let aeron = Aeron::new(&ctx)?;
901 aeron.start()?;
902
903 info!("creating suscriber 2");
904 let sub2 = aeron_sub.add_subscription(
905 &"aeron:udp?tags=100".into_c_string(),
906 123,
907 Handlers::no_available_image_handler(),
908 Handlers::no_unavailable_image_handler(),
909 Duration::from_secs(50),
910 )?;
911
912 let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
913 info!("creating publisher");
914 assert!(!aeron_pub.is_closed());
915 let publisher = aeron_pub
916 .add_publication(
917 &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
918 123,
919 Duration::from_secs(5),
920 )
921 .map_err(|e| {
922 error!("aeron error={}", Aeron::errmsg());
923 e
924 })?;
925
926 info!("publishing msg");
927
928 loop {
929 let result = publisher.offer(
930 "213".as_bytes(),
931 Handlers::no_reserved_value_supplier_handler(),
932 );
933 if result < 0 {
934 error!(
935 "failed to publish {:?}",
936 AeronCError::from_code(result as i32)
937 );
938 } else {
939 break;
940 }
941 }
942
943 sub.poll_once(
944 |msg, _header| {
945 println!("Received message: {:?}", msg);
946 },
947 128,
948 )?;
949 sub2.poll_once(
950 |msg, _header| {
951 println!("Received message: {:?}", msg);
952 },
953 128,
954 )?;
955
956 stop.store(true, Ordering::SeqCst);
957
958 Ok(())
959 }
960
961 fn create_client(
962 media_driver_ctx: &AeronDriverContext,
963 ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
964 let dir = media_driver_ctx.get_dir();
965 info!("creating aeron client [dir={}]", dir);
966 let ctx = AeronContext::new()?;
967 ctx.set_dir(&dir.into_c_string())?;
968 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
969 let aeron = Aeron::new(&ctx)?;
970 aeron.start()?;
971 Ok((ctx, aeron))
972 }
973
974 fn start_media_driver(
975 instance: u64,
976 ) -> Result<
977 (
978 AeronDriverContext,
979 Arc<AtomicBool>,
980 JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
981 ),
982 Box<dyn Error>,
983 > {
984 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
985 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
986 media_driver_ctx.set_dir_delete_on_start(true)?;
987 media_driver_ctx.set_dir(
988 &format!(
989 "{}{}-{}",
990 media_driver_ctx.get_dir(),
991 Aeron::epoch_clock(),
992 instance
993 )
994 .into_c_string(),
995 )?;
996 let (stop, driver_handle) =
997 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
998 Ok((media_driver_ctx, stop, driver_handle))
999 }
1000
1001 #[doc = include_str!("../../README.md")]
1002 mod readme_tests {}
1003}