rusteron_archive/
lib.rs

1/**/
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use std::cell::Cell;
26use std::os::raw::c_int;
27use std::time::{Duration, Instant};
28
29pub mod testing;
30
31include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
32include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
33
34pub type SourceLocation = bindings::aeron_archive_source_location_t;
35pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
36    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
37pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
38    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
39
40unsafe extern "C" fn no_op_idle_strategy_func(
41    _state: *mut std::os::raw::c_void,
42    _work_count: c_int,
43) {
44}
45
46pub struct RecordingPos;
47impl RecordingPos {
48    pub fn find_counter_id_by_session(
49        counter_reader: &AeronCountersReader,
50        session_id: i32,
51    ) -> i32 {
52        unsafe {
53            aeron_archive_recording_pos_find_counter_id_by_session_id(
54                counter_reader.get_inner(),
55                session_id,
56            )
57        }
58    }
59    pub fn find_counter_id_by_recording_id(
60        counter_reader: &AeronCountersReader,
61        recording_id: i64,
62    ) -> i32 {
63        unsafe {
64            aeron_archive_recording_pos_find_counter_id_by_recording_id(
65                counter_reader.get_inner(),
66                recording_id,
67            )
68        }
69    }
70
71    /// Return the recordingId embedded in the key of the given counter
72    /// if it is indeed a "recording position" counter. Otherwise return -1.
73    pub fn get_recording_id_block(
74        counters_reader: &AeronCountersReader,
75        counter_id: i32,
76        wait: Duration,
77    ) -> Result<i64, AeronCError> {
78        let mut result = Self::get_recording_id(counters_reader, counter_id);
79        let instant = Instant::now();
80
81        while result.is_err() && instant.elapsed() < wait {
82            result = Self::get_recording_id(counters_reader, counter_id);
83            #[cfg(debug_assertions)]
84            std::thread::sleep(Duration::from_millis(10));
85        }
86
87        return result;
88    }
89
90    /// Return the recordingId embedded in the key of the given counter
91    /// if it is indeed a "recording position" counter. Otherwise return -1.
92    pub fn get_recording_id(
93        counters_reader: &AeronCountersReader,
94        counter_id: i32,
95    ) -> Result<i64, AeronCError> {
96        /// The type id for an Aeron Archive recording position counter.
97        /// In Aeron Java, this is AeronCounters.ARCHIVE_RECORDING_POSITION_TYPE_ID (which is typically 100).
98        pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
99
100        /// from Aeron Java code
101        pub const RECORD_ALLOCATED: i32 = 1;
102
103        /// A constant to mean "no valid recording ID".
104        pub const NULL_RECORDING_ID: i64 = -1;
105
106        if counter_id < 0 {
107            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
108        }
109
110        let state = counters_reader.counter_state(counter_id)?;
111        if state != RECORD_ALLOCATED {
112            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
113        }
114
115        let type_id = counters_reader.counter_type_id(counter_id)?;
116        if type_id != RECORDING_POSITION_TYPE_ID {
117            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
118        }
119
120        // Read the key area. For a RECORDING_POSITION_TYPE_ID counter:
121        //    - offset 0..8 => the i64 recording_id
122        //    - offset 8..12 => the session_id (int)
123        //    etc...
124        // only need the first 8 bytes to get the recordingId.
125        let recording_id = Cell::new(-1);
126        counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
127            if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
128                let mut val = [0u8; 8];
129                val.copy_from_slice(&key[0..8]);
130                let Ok(value) = i64::from_le_bytes(val).try_into();
131                recording_id.set(value);
132            }
133        });
134        let recording_id = recording_id.get();
135        if recording_id < 0 {
136            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
137        }
138
139        Ok(recording_id)
140    }
141}
142
143unsafe extern "C" fn default_encoded_credentials(
144    _clientd: *mut std::os::raw::c_void,
145) -> *mut aeron_archive_encoded_credentials_t {
146    // Allocate a zeroed instance of `aeron_archive_encoded_credentials_t`
147    let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
148        data: std::ptr::null(),
149        length: 0,
150    });
151    Box::into_raw(empty_credentials)
152}
153
154unsafe extern "C" fn default_credentials_free(
155    credentials: *mut aeron_archive_encoded_credentials_t,
156    _clientd: *mut std::os::raw::c_void,
157) {
158    if !credentials.is_null() {
159        let _ = Box::from_raw(credentials);
160    }
161}
162
163impl AeronArchive {
164    pub fn aeron(&self) -> Aeron {
165        self.get_archive_context().get_aeron()
166    }
167}
168
169impl AeronArchiveAsyncConnect {
170    #[inline]
171    /// recommend using this method instead of standard `new` as it will link the archive to aeron so if a drop occurs archive is dropped before aeron
172    pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
173        let resource_async = Self::new(ctx)?;
174        resource_async.inner.add_dependency(aeron.clone());
175        Ok(resource_async)
176    }
177}
178
179macro_rules! impl_archive_position_methods {
180    ($pub_type:ty) => {
181        impl $pub_type {
182            /// Retrieves the current active live archive position using the Aeron counters.
183            /// Returns an error if not found.
184            pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
185                if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
186                    let counter_reader = &aeron.counters_reader();
187                    self.get_archive_position_with(counter_reader)
188                } else {
189                    Err(AeronCError::from_code(-1))
190                }
191            }
192
193            /// Retrieves the current active live archive position using the provided counter reader.
194            /// Returns an error if not found.
195            pub fn get_archive_position_with(
196                &self,
197                counters: &AeronCountersReader,
198            ) -> Result<i64, AeronCError> {
199                let session_id = self.get_constants()?.session_id();
200                let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
201                if counter_id < 0 {
202                    return Err(AeronCError::from_code(counter_id));
203                }
204                let position = counters.get_counter_value(counter_id);
205                if position < 0 {
206                    return Err(AeronCError::from_code(position as i32));
207                }
208                Ok(position)
209            }
210
211            /// Checks if the publication's current position is within a specified inclusive length
212            /// of the archive position.
213            pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
214                let archive_position = self.get_archive_position().unwrap_or(-1);
215                if archive_position < 0 {
216                    return false;
217                }
218                self.position() - archive_position <= length_inclusive as i64
219            }
220        }
221    };
222}
223
224impl_archive_position_methods!(AeronPublication);
225impl_archive_position_methods!(AeronExclusivePublication);
226
227impl AeronArchiveContext {
228    // The method below sets no credentials supplier, which is essential for the operation
229    // of the Aeron Archive Context. The `set_credentials_supplier` must be set to prevent
230    // segmentation faults in the C bindings.
231    pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
232        let result = unsafe {
233            bindings::aeron_archive_context_set_credentials_supplier(
234                self.get_inner(),
235                Some(default_encoded_credentials),
236                None,
237                Some(default_credentials_free),
238                std::ptr::null_mut(),
239            )
240        };
241        if result < 0 {
242            Err(AeronCError::from_code(result))
243        } else {
244            Ok(result)
245        }
246    }
247
248    /// This method creates a new `AeronArchiveContext` with a no-op credentials supplier.
249    /// If you do not set a credentials supplier, it will segfault.
250    /// This method ensures that a non-functional credentials supplier is set to avoid the segfault.
251    pub fn new_with_no_credentials_supplier(
252        aeron: &Aeron,
253        request_control_channel: &str,
254        response_control_channel: &str,
255        recording_events_channel: &str,
256    ) -> Result<AeronArchiveContext, AeronCError> {
257        let context = Self::new()?;
258        context.set_no_credentials_supplier()?;
259        context.set_aeron(aeron)?;
260        context.set_control_request_channel(&request_control_channel.into_c_string())?;
261        context.set_control_response_channel(&response_control_channel.into_c_string())?;
262        context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
263        // see https://github.com/gsrxyz/rusteron/issues/18
264        // Use a plain function pointer with null clientd to avoid sharing mutable handler state.
265        let result = unsafe {
266            bindings::aeron_archive_context_set_idle_strategy(
267                context.get_inner(),
268                Some(no_op_idle_strategy_func),
269                std::ptr::null_mut(),
270            )
271        };
272        if result < 0 {
273            return Err(AeronCError::from_code(result));
274        }
275        Ok(context)
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use log::{error, info};
283
284    use crate::testing::EmbeddedArchiveMediaDriverProcess;
285    use serial_test::serial;
286    use std::cell::Cell;
287    use std::error;
288    use std::error::Error;
289    use std::str::FromStr;
290    use std::sync::atomic::{AtomicBool, Ordering};
291    use std::sync::Arc;
292    use std::thread::{sleep, JoinHandle};
293    use std::time::{Duration, Instant};
294
295    #[derive(Default, Debug)]
296    struct ErrorCount {
297        error_count: usize,
298    }
299
300    impl AeronErrorHandlerCallback for ErrorCount {
301        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
302            error!("Aeron error {}: {}", error_code, msg);
303            self.error_count += 1;
304        }
305    }
306
307    pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
308    pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
309    pub const ARCHIVE_RECORDING_EVENTS: &str =
310        "aeron:udp?control-mode=dynamic|control=localhost:8012";
311
312    #[test]
313    fn test_uri_string_builder() -> Result<(), AeronCError> {
314        let builder = AeronUriStringBuilder::default();
315
316        builder.init_new()?;
317        builder
318            .media(Media::Udp)? // very important to set media else set_initial_position will give an error of -1
319            .mtu_length(1024 * 64)?
320            .set_initial_position(127424949617280, 1182294755, 65536)?;
321        let uri = builder.build(1024)?;
322        assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
323
324        builder.init_new()?;
325        let uri = builder
326            .media(Media::Udp)?
327            .control_mode(ControlMode::Dynamic)?
328            .reliable(false)?
329            .ttl(2)?
330            .endpoint("localhost:1235")?
331            .control("localhost:1234")?
332            .build(1024)?;
333        assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
334
335        let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
336            .ttl(5)?
337            .build(1024)?;
338
339        assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
340
341        let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
342
343        assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
344
345        Ok(())
346    }
347
348    pub const STREAM_ID: i32 = 1033;
349    pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
350    pub const CONTROL_ENDPOINT: &str = "localhost:23265";
351    pub const RECORDING_ENDPOINT: &str = "localhost:23266";
352    pub const LIVE_ENDPOINT: &str = "localhost:23267";
353    pub const REPLAY_ENDPOINT: &str = "localhost:0";
354    // pub const REPLAY_ENDPOINT: &str = "localhost:23268";
355
356    #[test]
357    #[serial]
358    fn test_simple_replay_merge() -> Result<(), AeronCError> {
359        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
360
361        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
362            .expect("failed to kill all java processes");
363
364        assert!(is_udp_port_available(23265));
365        assert!(is_udp_port_available(23266));
366        assert!(is_udp_port_available(23267));
367        assert!(is_udp_port_available(23268));
368        let id = Aeron::nano_clock();
369        let aeron_dir = format!("target/aeron/{}/shm", id);
370        let archive_dir = format!("target/aeron/{}/archive", id);
371
372        info!("starting archive media driver");
373        let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
374            &aeron_dir,
375            &format!("{}/archive", aeron_dir),
376            ARCHIVE_CONTROL_REQUEST,
377            ARCHIVE_CONTROL_RESPONSE,
378            ARCHIVE_RECORDING_EVENTS,
379        )
380        .expect("Failed to start embedded media driver");
381
382        info!("connecting to archive");
383        let (archive, aeron) = media_driver
384            .archive_connect()
385            .expect("Could not connect to archive client");
386
387        let running = Arc::new(AtomicBool::new(true));
388
389        info!("connected to archive, adding publication");
390        assert!(!aeron.is_closed());
391
392        let (session_id, publisher_thread) =
393            reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
394
395        {
396            let context = AeronContext::new()?;
397            context.set_dir(&media_driver.aeron_dir)?;
398            let mut error_handler = Handler::leak(ErrorCount::default());
399            context.set_error_handler(Some(&error_handler))?;
400            context.set_driver_timeout_ms(60_000)?;
401
402            // Wrap fallible code so release() is always called even on error/panic
403            let inner: Result<(), AeronCError> = (|| {
404                let aeron = Aeron::new(&context)?;
405                aeron.start()?;
406                let aeron_archive_context = archive.get_archive_context();
407                let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
408                    &aeron,
409                    aeron_archive_context.get_control_request_channel(),
410                    aeron_archive_context.get_control_response_channel(),
411                    aeron_archive_context.get_recording_events_channel(),
412                )?;
413                aeron_archive_context.set_message_timeout_ns(60_000_000_000)?;
414                aeron_archive_context.set_error_handler(Some(&error_handler))?;
415                let merge_archive =
416                    AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
417                        .poll_blocking(Duration::from_secs(60))?;
418                replay_merge_subscription(&merge_archive, aeron.clone(), session_id)?;
419                Ok(())
420            })();
421
422            error_handler.release();
423            inner?;
424        }
425
426        running.store(false, Ordering::Release);
427        publisher_thread.join().unwrap();
428        drop(archive);
429        drop(aeron);
430        drop(media_driver);
431
432        Ok(())
433    }
434
435    fn reply_merge_publisher(
436        archive: &AeronArchive,
437        aeron: Aeron,
438        running: Arc<AtomicBool>,
439    ) -> Result<(i32, JoinHandle<()>), AeronCError> {
440        let publication = aeron.add_publication(
441            // &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536|fc=tagged,g:99901/1,t:5s"),
442            &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
443                .into_c_string(),
444            STREAM_ID,
445            Duration::from_secs(5),
446        )?;
447
448        info!(
449            "publication {} [status={:?}]",
450            publication.channel(),
451            publication.channel_status()
452        );
453        assert_eq!(1, publication.channel_status());
454
455        let session_id = publication.session_id();
456        let recording_channel = format!(
457            // "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}|gtag=99901"
458            "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
459        );
460        info!("recording channel {}", recording_channel);
461        archive.start_recording(
462            &recording_channel.into_c_string(),
463            STREAM_ID,
464            SOURCE_LOCATION_REMOTE,
465            true,
466        )?;
467
468        info!("waiting for publisher to be connected");
469        while !publication.is_connected() {
470            thread::sleep(Duration::from_millis(100));
471        }
472        info!("publisher to be connected");
473        let counters_reader = aeron.counters_reader();
474        let mut caught_up_count = 0;
475        let publisher_thread = thread::spawn(move || {
476            let mut message_count = 0;
477
478            while running.load(Ordering::Acquire) {
479                let message = format!("{}{}", MESSAGE_PREFIX, message_count);
480                while publication.offer(
481                    message.as_bytes(),
482                    Handlers::no_reserved_value_supplier_handler(),
483                ) <= 0
484                {
485                    thread::sleep(Duration::from_millis(10));
486                }
487                message_count += 1;
488                if message_count % 10_000 == 0 {
489                    info!(
490                        "Published {} messages [position={}]",
491                        message_count,
492                        publication.position()
493                    );
494                }
495                // slow down publishing so can catch up
496                if message_count > 10_000 {
497                    // ensure archiver is caught up
498                    while !publication.is_archive_position_with(0) {
499                        thread::sleep(Duration::from_micros(300));
500                    }
501                    caught_up_count += 1;
502                }
503            }
504            assert!(caught_up_count > 0);
505            if let Err(err) = publication.close(Handlers::no_notification_handler()) {
506                info!("publisher close returned error: {err:?}");
507            }
508            info!("Publisher thread terminated");
509        });
510        Ok((session_id, publisher_thread))
511    }
512
513    fn replay_merge_subscription(
514        archive: &AeronArchive,
515        aeron: Aeron,
516        session_id: i32,
517    ) -> Result<(), AeronCError> {
518        // let replay_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
519        let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
520        info!("replay channel {:?}", replay_channel);
521
522        let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
523        info!("replay destination {:?}", replay_destination);
524
525        let live_destination =
526            format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
527                .into_c_string();
528        info!("live destination {:?}", live_destination);
529
530        let counters_reader = aeron.counters_reader();
531        let mut counter_id = -1;
532
533        while counter_id < 0 {
534            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
535        }
536        info!(
537            "counter id {} {:?}",
538            counter_id,
539            counters_reader.get_counter_label(counter_id, 1024)
540        );
541        info!(
542            "counter id {} position={:?}",
543            counter_id,
544            counters_reader.get_counter_value(counter_id)
545        );
546
547        // let recording_id = Cell::new(-1);
548        // let start_position = Cell::new(-1);
549
550        // let mut count = 0;
551        // assert!(
552        //     archive.list_recordings_once(&mut count, 0, 1000, |descriptor| {
553        //         info!("Recording descriptor: {:?}", descriptor);
554        //         recording_id.set(descriptor.recording_id);
555        //         start_position.set(descriptor.start_position);
556        //         assert_eq!(descriptor.session_id, session_id);
557        //         assert_eq!(descriptor.stream_id, STREAM_ID);
558        //     })? >= 0
559        // );
560        // assert!(count > 0);
561        // assert!(recording_id.get() >= 0);
562
563        // let record_id = RecordingPos::get_recording_id(&aeron.counters_reader(), counter_id)?;
564        // assert_eq!(recording_id.get(), record_id);
565        //
566        // let recording_id = recording_id.get();
567        // let start_position = start_position.get();
568        let start_position = 0;
569        let recording_id = RecordingPos::get_recording_id_block(
570            &aeron.counters_reader(),
571            counter_id,
572            Duration::from_secs(5),
573        )?;
574
575        let subscribe_channel =
576            format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
577        info!("subscribe channel {:?}", subscribe_channel);
578        let subscription = aeron.add_subscription(
579            &subscribe_channel,
580            STREAM_ID,
581            Handlers::no_available_image_handler(),
582            Handlers::no_unavailable_image_handler(),
583            Duration::from_secs(5),
584        )?;
585
586        let replay_merge = AeronArchiveReplayMerge::new(
587            &subscription,
588            &archive,
589            &replay_channel,
590            &replay_destination,
591            &live_destination,
592            recording_id,
593            start_position,
594            Aeron::epoch_clock(),
595            10_000,
596        )?;
597
598        info!(
599            "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
600            recording_id,
601            start_position,
602            subscribe_channel,
603            &replay_channel,
604            &replay_destination,
605            &live_destination
606        );
607
608        // media_driver
609        //     .run_aeron_stats()
610        //     .expect("Failed to run aeron stats");
611
612        // info!("Waiting for subscription to connect...");
613        // while !subscription.is_connected() {
614        //     thread::sleep(Duration::from_millis(100));
615        // }
616        // info!("Subscription connected");
617
618        info!(
619            "about to start_replay [maxRecordPosition={:?}]",
620            archive.get_max_recorded_position(recording_id)
621        );
622
623        let mut reply_count = 0;
624        while !replay_merge.is_merged() {
625            assert!(!replay_merge.has_failed());
626            if replay_merge.poll_once(
627                |buffer, _header| {
628                    reply_count += 1;
629                    if reply_count % 10_000 == 0 {
630                        info!(
631                            "replay-merge [count={}, isMerged={}, isLive={}]",
632                            reply_count,
633                            replay_merge.is_merged(),
634                            replay_merge.is_live_added()
635                        );
636                    }
637                },
638                100,
639            )? == 0
640            {
641                let err = archive.poll_for_error_response_as_string(4096)?;
642                if !err.is_empty() {
643                    panic!("{}", err);
644                }
645                if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
646                    panic!("{}", Aeron::errmsg());
647                }
648                archive.poll_for_recording_signals()?;
649                thread::sleep(Duration::from_millis(100));
650            }
651        }
652        assert!(!replay_merge.has_failed());
653        assert!(replay_merge.is_live_added());
654        assert!(reply_count > 10_000, "no replay-merge fragments received");
655        if let Err(err) = replay_merge.close() {
656            info!("replay merge close returned error: {err:?}");
657        }
658        if let Err(err) = subscription.close(Handlers::no_notification_handler()) {
659            info!("replay subscription close returned error: {err:?}");
660        }
661        Ok(())
662    }
663
664    #[test]
665    fn version_check() {
666        let major = unsafe { crate::aeron_version_major() };
667        let minor = unsafe { crate::aeron_version_minor() };
668        let patch = unsafe { crate::aeron_version_patch() };
669
670        let aeron_version = format!("{}.{}.{}", major, minor, patch);
671
672        let cargo_version = "1.50.2";
673        assert_eq!(aeron_version, cargo_version);
674    }
675
676    use std::thread;
677
678    fn start_aeron_archive() -> Result<
679        (
680            Aeron,
681            AeronArchiveContext,
682            EmbeddedArchiveMediaDriverProcess,
683            Handler<AeronPublicationErrorFrameHandlerLogger>,
684            Handler<ErrorCount>,
685        ),
686        Box<dyn Error>,
687    > {
688        let id = Aeron::nano_clock();
689        let aeron_dir = format!("target/aeron/{}/shm", id);
690        let archive_dir = format!("target/aeron/{}/archive", id);
691
692        let request_port = find_unused_udp_port(8000).expect("Could not find port");
693        let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
694        let recording_event_port =
695            find_unused_udp_port(response_port + 1).expect("Could not find port");
696        let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
697        let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
698        let recording_events_channel =
699            &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
700        assert_ne!(request_control_channel, response_control_channel);
701
702        let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
703            &aeron_dir,
704            &archive_dir,
705            request_control_channel,
706            response_control_channel,
707            recording_events_channel,
708        )
709        .expect("Failed to start Java process");
710
711        let aeron_context = AeronContext::new()?;
712        aeron_context.set_dir(&aeron_dir.into_c_string())?;
713        aeron_context.set_client_name(&"test".into_c_string())?;
714        let mut pub_error_frame_handler = Handler::leak(AeronPublicationErrorFrameHandlerLogger);
715        aeron_context.set_publication_error_frame_handler(Some(&pub_error_frame_handler))?;
716        let mut error_handler = Handler::leak(ErrorCount::default());
717        aeron_context.set_error_handler(Some(&error_handler))?;
718
719        // Use inner closure so we can call release() on any error path after handlers are created
720        let inner: Result<(Aeron, AeronArchiveContext), Box<dyn Error>> = (|| {
721            let aeron = Aeron::new(&aeron_context)?;
722            aeron.start()?;
723            let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
724                &aeron,
725                request_control_channel,
726                response_control_channel,
727                recording_events_channel,
728            )?;
729            archive_context.set_error_handler(Some(&error_handler))?;
730            Ok((aeron, archive_context))
731        })();
732
733        match inner {
734            Ok((aeron, archive_context)) => Ok((
735                aeron,
736                archive_context,
737                archive_media_driver,
738                pub_error_frame_handler,
739                error_handler,
740            )),
741            Err(e) => {
742                pub_error_frame_handler.release();
743                error_handler.release();
744                Err(e)
745            }
746        }
747    }
748
749    #[test]
750    #[serial]
751    pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
752        rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
753        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
754            .expect("failed to kill all java processes");
755
756        let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
757            start_aeron_archive()?;
758
759        let test_result: Result<(), Box<dyn error::Error>> = (|| {
760            assert!(!aeron.is_closed());
761
762            info!("connected to aeron");
763
764            let archive_connector =
765                AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
766            let archive = archive_connector
767                .poll_blocking(Duration::from_secs(30))
768                .expect("failed to connect to aeron archive media driver");
769
770            assert!(archive.get_archive_id() > 0);
771
772            let channel = AERON_IPC_STREAM;
773            let stream_id = 10;
774
775            let subscription_id =
776                archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
777
778            assert!(subscription_id >= 0);
779            info!("subscription id {}", subscription_id);
780
781            let publication = aeron
782                .async_add_exclusive_publication(channel, stream_id)?
783                .poll_blocking(Duration::from_secs(5))?;
784
785            for i in 0..11 {
786                while publication.offer(
787                    "123456".as_bytes(),
788                    Handlers::no_reserved_value_supplier_handler(),
789                ) <= 0
790                {
791                    sleep(Duration::from_millis(50));
792                    archive.poll_for_recording_signals()?;
793                    let err = archive.poll_for_error_response_as_string(4096)?;
794                    if !err.is_empty() {
795                        return Err(std::io::Error::other(err).into());
796                    }
797                    archive.idle();
798                }
799                info!("sent message {i} [test_aeron_archive]");
800            }
801
802            archive.idle();
803            let session_id = publication.get_constants()?.session_id;
804            info!("publication session id {}", session_id);
805            // since this is single threaded need to make sure it did write to archiver, usually not required in multi-proccess app
806            let stop_position = publication.position();
807            info!(
808                "publication stop position {} [publication={:?}]",
809                stop_position,
810                publication.get_constants()
811            );
812            let counters_reader = aeron.counters_reader();
813            info!("counters reader ready {:?}", counters_reader);
814
815            let mut counter_id = -1;
816
817            let start = Instant::now();
818            while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
819                counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
820                info!("counter id {}", counter_id);
821            }
822
823            assert!(counter_id >= 0);
824
825            info!("counter id {counter_id}, session id {session_id}");
826            while counters_reader.get_counter_value(counter_id) < stop_position {
827                info!(
828                    "current archive publication stop position {}",
829                    counters_reader.get_counter_value(counter_id)
830                );
831                sleep(Duration::from_millis(50));
832            }
833            info!(
834                "found archive publication stop position {}",
835                counters_reader.get_counter_value(counter_id)
836            );
837
838            archive.stop_recording_channel_and_stream(channel, stream_id)?;
839            drop(publication);
840
841            info!("list recordings");
842            let found_recording_id = Cell::new(-1);
843            let start_pos = Cell::new(-1);
844            let end_pos = Cell::new(-1);
845            let start = Instant::now();
846            while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
847                let mut count = 0;
848                archive.list_recordings_for_uri_once(
849                    &mut count,
850                    0,
851                    i32::MAX,
852                    channel,
853                    stream_id,
854                    |d: AeronArchiveRecordingDescriptor| {
855                        assert_eq!(d.stream_id, stream_id);
856                        info!("found recording {:#?}", d);
857                        info!(
858                            "strippedChannel={}, originalChannel={}",
859                            d.stripped_channel(),
860                            d.original_channel()
861                        );
862                        if d.stop_position > d.start_position && d.stop_position > 0 {
863                            found_recording_id.set(d.recording_id);
864                            start_pos.set(d.start_position);
865                            end_pos.set(d.stop_position);
866                        }
867
868                        // verify clone_struct works
869                        let copy = d.clone_struct();
870                        assert_eq!(copy.deref(), d.deref());
871                        assert_eq!(copy.recording_id, d.recording_id);
872                        assert_eq!(copy.control_session_id, d.control_session_id);
873                        assert_eq!(copy.mtu_length, d.mtu_length);
874                        assert_eq!(copy.source_identity_length, d.source_identity_length);
875                    },
876                )?;
877                archive.poll_for_recording_signals()?;
878                let err = archive.poll_for_error_response_as_string(4096)?;
879                if !err.is_empty() {
880                    return Err(std::io::Error::other(err).into());
881                }
882            }
883            assert!(start.elapsed() < Duration::from_secs(5));
884            info!("start replay");
885            let params = AeronArchiveReplayParams::new(
886                0,
887                i32::MAX,
888                start_pos.get(),
889                end_pos.get() - start_pos.get(),
890                0,
891                0,
892            )?;
893            info!("replay params {:#?}", params);
894            let replay_stream_id = 45;
895            let replay_session_id = archive.start_replay(
896                found_recording_id.get(),
897                channel,
898                replay_stream_id,
899                &params,
900            )?;
901            let session_id = replay_session_id as i32;
902
903            info!("replay session id {}", replay_session_id);
904            info!("session id {}", session_id);
905            let channel_replay =
906                format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
907            info!("archive id: {}", archive.get_archive_id());
908
909            info!("add subscription {:?}", channel_replay);
910            let mut avail_image_handler = Handler::leak(AeronAvailableImageLogger);
911            let mut unavail_image_handler = Handler::leak(AeronUnavailableImageLogger);
912            let replay_result: Result<(), Box<dyn error::Error>> = (|| {
913                let subscription = aeron
914                    .async_add_subscription(
915                        &channel_replay,
916                        replay_stream_id,
917                        Some(&avail_image_handler),
918                        Some(&unavail_image_handler),
919                    )?
920                    .poll_blocking(Duration::from_secs(10))?;
921
922                #[derive(Default)]
923                struct FragmentHandler {
924                    count: Cell<usize>,
925                }
926
927                impl AeronFragmentHandlerCallback for FragmentHandler {
928                    fn handle_aeron_fragment_handler(
929                        &mut self,
930                        buffer: &[u8],
931                        _header: AeronHeader,
932                    ) {
933                        assert_eq!(buffer, "123456".as_bytes());
934
935                        // Update count (using Cell for interior mutability)
936                        self.count.set(self.count.get() + 1);
937                    }
938                }
939
940                let mut poll = Handler::leak(FragmentHandler::default());
941                let poll_result: Result<(), Box<dyn error::Error>> = (|| {
942                    let wait_timeout = Duration::from_secs(30);
943                    let start = Instant::now();
944                    while start.elapsed() < wait_timeout
945                        && subscription.poll(Some(&poll), 100)? <= 0
946                    {
947                        let err = archive.poll_for_error_response_as_string(4096)?;
948                        if !err.is_empty() {
949                            return Err(std::io::Error::other(err).into());
950                        }
951                    }
952
953                    if start.elapsed() >= wait_timeout {
954                        return Err(std::io::Error::other(format!(
955                            "messages not received {:?}",
956                            poll.count
957                        ))
958                        .into());
959                    }
960
961                    info!("aeron {:?}", aeron);
962                    info!("ctx {:?}", archive_context);
963                    if poll.count.get() != 11 {
964                        return Err(std::io::Error::other(format!(
965                            "expected 11 replayed messages, got {}",
966                            poll.count.get()
967                        ))
968                        .into());
969                    }
970                    Ok(())
971                })();
972
973                drop(subscription);
974                poll.release();
975                poll_result
976            })();
977
978            avail_image_handler.release();
979            unavail_image_handler.release();
980            replay_result?;
981            drop(archive);
982            Ok(())
983        })();
984
985        drop(aeron);
986        drop(media_driver);
987        pub_error_frame_handler.release();
988        error_handler.release();
989        test_result
990    }
991
992    #[test]
993    #[serial]
994    fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
995        let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
996            start_aeron_archive()?;
997        let archive_connector =
998            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
999        let archive = archive_connector
1000            .poll_blocking(Duration::from_secs(30))
1001            .expect("failed to connect to archive");
1002
1003        let invalid_channel = "invalid:channel".into_c_string();
1004        let result =
1005            archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
1006        assert!(
1007            result.is_err(),
1008            "Expected error when starting recording with an invalid channel"
1009        );
1010        drop(archive);
1011        drop(aeron);
1012        drop(media_driver);
1013        pub_error_frame_handler.release();
1014        error_handler.release();
1015        Ok(())
1016    }
1017
1018    #[test]
1019    #[serial]
1020    fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
1021        let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
1022            start_aeron_archive()?;
1023        let archive_connector =
1024            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
1025        let archive = archive_connector
1026            .poll_blocking(Duration::from_secs(30))
1027            .expect("failed to connect to archive");
1028
1029        let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
1030        let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
1031        assert!(
1032            result.is_err(),
1033            "Expected error when stopping recording on a non-existent channel"
1034        );
1035        drop(archive);
1036        drop(aeron);
1037        drop(media_driver);
1038        pub_error_frame_handler.release();
1039        error_handler.release();
1040        Ok(())
1041    }
1042
1043    #[test]
1044    #[serial]
1045    fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
1046        let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
1047            start_aeron_archive()?;
1048        let archive_connector =
1049            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
1050        let archive = archive_connector
1051            .poll_blocking(Duration::from_secs(30))
1052            .expect("failed to connect to archive");
1053
1054        let invalid_recording_id = -999;
1055        let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
1056        let result = archive.start_replay(
1057            invalid_recording_id,
1058            &"aeron:udp?endpoint=localhost:8888".into_c_string(),
1059            STREAM_ID,
1060            &params,
1061        );
1062        assert!(
1063            result.is_err(),
1064            "Expected error when starting replay with an invalid recording id"
1065        );
1066        drop(archive);
1067        drop(aeron);
1068        drop(media_driver);
1069        pub_error_frame_handler.release();
1070        error_handler.release();
1071        Ok(())
1072    }
1073
1074    #[test]
1075    #[serial]
1076    fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
1077        let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
1078            start_aeron_archive()?;
1079        let archive_connector =
1080            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
1081        let archive = archive_connector
1082            .poll_blocking(Duration::from_secs(30))
1083            .expect("failed to connect to archive");
1084
1085        drop(archive);
1086
1087        let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
1088        let new_archive = archive_connector
1089            .poll_blocking(Duration::from_secs(30))
1090            .expect("failed to reconnect to archive");
1091        assert!(
1092            new_archive.get_archive_id() > 0,
1093            "Reconnected archive should have a valid archive id"
1094        );
1095
1096        drop(new_archive);
1097        drop(aeron);
1098        drop(media_driver);
1099        pub_error_frame_handler.release();
1100        error_handler.release();
1101        Ok(())
1102    }
1103}
1104
1105// run `just slow-tests`
1106#[cfg(test)]
1107mod slow_consumer_test;