rusteron_archive/
lib.rs

1/**/
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use std::cell::Cell;
26use std::os::raw::c_int;
27use std::time::{Duration, Instant};
28
29pub mod testing;
30
31include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
32include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
33
34pub type SourceLocation = bindings::aeron_archive_source_location_t;
35pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
36    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
37pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
38    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
39
40pub struct NoOpAeronIdleStrategyFunc;
41
42impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
43    fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
44}
45
46pub struct RecordingPos;
47impl RecordingPos {
48    pub fn find_counter_id_by_session(
49        counter_reader: &AeronCountersReader,
50        session_id: i32,
51    ) -> i32 {
52        unsafe {
53            aeron_archive_recording_pos_find_counter_id_by_session_id(
54                counter_reader.get_inner(),
55                session_id,
56            )
57        }
58    }
59    pub fn find_counter_id_by_recording_id(
60        counter_reader: &AeronCountersReader,
61        recording_id: i64,
62    ) -> i32 {
63        unsafe {
64            aeron_archive_recording_pos_find_counter_id_by_recording_id(
65                counter_reader.get_inner(),
66                recording_id,
67            )
68        }
69    }
70
71    /// Return the recordingId embedded in the key of the given counter
72    /// if it is indeed a "recording position" counter. Otherwise return -1.
73    pub fn get_recording_id_block(
74        counters_reader: &AeronCountersReader,
75        counter_id: i32,
76        wait: Duration,
77    ) -> Result<i64, AeronCError> {
78        let mut result = Self::get_recording_id(counters_reader, counter_id);
79        let instant = Instant::now();
80
81        while result.is_err() && instant.elapsed() < wait {
82            result = Self::get_recording_id(counters_reader, counter_id);
83            #[cfg(debug_assertions)]
84            std::thread::sleep(Duration::from_millis(10));
85        }
86
87        return result;
88    }
89
90    /// Return the recordingId embedded in the key of the given counter
91    /// if it is indeed a "recording position" counter. Otherwise return -1.
92    pub fn get_recording_id(
93        counters_reader: &AeronCountersReader,
94        counter_id: i32,
95    ) -> Result<i64, AeronCError> {
96        /// The type id for an Aeron Archive recording position counter.
97        /// In Aeron Java, this is AeronCounters.ARCHIVE_RECORDING_POSITION_TYPE_ID (which is typically 100).
98        pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
99
100        /// from Aeron Java code
101        pub const RECORD_ALLOCATED: i32 = 1;
102
103        /// A constant to mean "no valid recording ID".
104        pub const NULL_RECORDING_ID: i64 = -1;
105
106        if counter_id < 0 {
107            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
108        }
109
110        let state = counters_reader.counter_state(counter_id)?;
111        if state != RECORD_ALLOCATED {
112            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
113        }
114
115        let type_id = counters_reader.counter_type_id(counter_id)?;
116        if type_id != RECORDING_POSITION_TYPE_ID {
117            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
118        }
119
120        // Read the key area. For a RECORDING_POSITION_TYPE_ID counter:
121        //    - offset 0..8 => the i64 recording_id
122        //    - offset 8..12 => the session_id (int)
123        //    etc...
124        // only need the first 8 bytes to get the recordingId.
125        let recording_id = Cell::new(-1);
126        counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
127            if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
128                let mut val = [0u8; 8];
129                val.copy_from_slice(&key[0..8]);
130                let Ok(value) = i64::from_le_bytes(val).try_into();
131                recording_id.set(value);
132            }
133        });
134        let recording_id = recording_id.get();
135        if recording_id < 0 {
136            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
137        }
138
139        Ok(recording_id)
140    }
141}
142
143unsafe extern "C" fn default_encoded_credentials(
144    _clientd: *mut std::os::raw::c_void,
145) -> *mut aeron_archive_encoded_credentials_t {
146    // Allocate a zeroed instance of `aeron_archive_encoded_credentials_t`
147    let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
148        data: std::ptr::null(),
149        length: 0,
150    });
151    Box::into_raw(empty_credentials)
152}
153
154impl AeronArchive {
155    pub fn aeron(&self) -> Aeron {
156        self.get_archive_context().get_aeron()
157    }
158}
159
160impl AeronArchiveAsyncConnect {
161    #[inline]
162    /// recommend using this method instead of standard `new` as it will link the archive to aeron so if a drop occurs archive is dropped before aeron
163    pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
164        let resource_async = Self::new(ctx)?;
165        resource_async.inner.add_dependency(aeron.clone());
166        Ok(resource_async)
167    }
168}
169
170macro_rules! impl_archive_position_methods {
171    ($pub_type:ty) => {
172        impl $pub_type {
173            /// Retrieves the current active live archive position using the Aeron counters.
174            /// Returns an error if not found.
175            pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
176                if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
177                    let counter_reader = &aeron.counters_reader();
178                    self.get_archive_position_with(counter_reader)
179                } else {
180                    Err(AeronCError::from_code(-1))
181                }
182            }
183
184            /// Retrieves the current active live archive position using the provided counter reader.
185            /// Returns an error if not found.
186            pub fn get_archive_position_with(
187                &self,
188                counters: &AeronCountersReader,
189            ) -> Result<i64, AeronCError> {
190                let session_id = self.get_constants()?.session_id();
191                let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
192                if counter_id < 0 {
193                    return Err(AeronCError::from_code(counter_id));
194                }
195                let position = counters.get_counter_value(counter_id);
196                if position < 0 {
197                    return Err(AeronCError::from_code(position as i32));
198                }
199                Ok(position)
200            }
201
202            /// Checks if the publication's current position is within a specified inclusive length
203            /// of the archive position.
204            pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
205                let archive_position = self.get_archive_position().unwrap_or(-1);
206                if archive_position < 0 {
207                    return false;
208                }
209                self.position() - archive_position <= length_inclusive as i64
210            }
211        }
212    };
213}
214
215impl_archive_position_methods!(AeronPublication);
216impl_archive_position_methods!(AeronExclusivePublication);
217
218impl AeronArchiveContext {
219    // The method below sets no credentials supplier, which is essential for the operation
220    // of the Aeron Archive Context. The `set_credentials_supplier` must be set to prevent
221    // segmentation faults in the C bindings.
222    pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
223        self.set_credentials_supplier(
224            Some(default_encoded_credentials),
225            None,
226            None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
227        )
228    }
229
230    /// This method creates a new `AeronArchiveContext` with a no-op credentials supplier.
231    /// If you do not set a credentials supplier, it will segfault.
232    /// This method ensures that a non-functional credentials supplier is set to avoid the segfault.
233    pub fn new_with_no_credentials_supplier(
234        aeron: &Aeron,
235        request_control_channel: &str,
236        response_control_channel: &str,
237        recording_events_channel: &str,
238    ) -> Result<AeronArchiveContext, AeronCError> {
239        let context = Self::new()?;
240        context.set_no_credentials_supplier()?;
241        context.set_aeron(aeron)?;
242        context.set_control_request_channel(&request_control_channel.into_c_string())?;
243        context.set_control_response_channel(&response_control_channel.into_c_string())?;
244        context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
245        // see https://github.com/gsrxyz/rusteron/issues/18
246        context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
247        Ok(context)
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use log::{error, info};
255
256    use crate::testing::EmbeddedArchiveMediaDriverProcess;
257    use serial_test::serial;
258    use std::cell::Cell;
259    use std::error;
260    use std::error::Error;
261    use std::str::FromStr;
262    use std::sync::atomic::{AtomicBool, Ordering};
263    use std::sync::Arc;
264    use std::thread::{sleep, JoinHandle};
265    use std::time::{Duration, Instant};
266
267    #[derive(Default, Debug)]
268    struct ErrorCount {
269        error_count: usize,
270    }
271
272    impl AeronErrorHandlerCallback for ErrorCount {
273        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
274            error!("Aeron error {}: {}", error_code, msg);
275            self.error_count += 1;
276        }
277    }
278
279    pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
280    pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
281    pub const ARCHIVE_RECORDING_EVENTS: &str =
282        "aeron:udp?control-mode=dynamic|control=localhost:8012";
283
284    #[test]
285    fn test_uri_string_builder() -> Result<(), AeronCError> {
286        let builder = AeronUriStringBuilder::default();
287
288        builder.init_new()?;
289        builder
290            .media(Media::Udp)? // very important to set media else set_initial_position will give an error of -1
291            .mtu_length(1024 * 64)?
292            .set_initial_position(127424949617280, 1182294755, 65536)?;
293        let uri = builder.build(1024)?;
294        assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
295
296        builder.init_new()?;
297        let uri = builder
298            .media(Media::Udp)?
299            .control_mode(ControlMode::Dynamic)?
300            .reliable(false)?
301            .ttl(2)?
302            .endpoint("localhost:1235")?
303            .control("localhost:1234")?
304            .build(1024)?;
305        assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
306
307        let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
308            .ttl(5)?
309            .build(1024)?;
310
311        assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
312
313        let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
314
315        assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
316
317        Ok(())
318    }
319
320    pub const STREAM_ID: i32 = 1033;
321    pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
322    pub const CONTROL_ENDPOINT: &str = "localhost:23265";
323    pub const RECORDING_ENDPOINT: &str = "localhost:23266";
324    pub const LIVE_ENDPOINT: &str = "localhost:23267";
325    pub const REPLAY_ENDPOINT: &str = "localhost:0";
326    // pub const REPLAY_ENDPOINT: &str = "localhost:23268";
327
328    #[test]
329    #[serial]
330    fn test_simple_replay_merge() -> Result<(), AeronCError> {
331        let _ = env_logger::Builder::new()
332            .is_test(true)
333            .filter_level(log::LevelFilter::Info)
334            .try_init();
335
336        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
337            .expect("failed to kill all java processes");
338
339        assert!(is_udp_port_available(23265));
340        assert!(is_udp_port_available(23266));
341        assert!(is_udp_port_available(23267));
342        assert!(is_udp_port_available(23268));
343        let id = Aeron::nano_clock();
344        let aeron_dir = format!("target/aeron/{}/shm", id);
345        let archive_dir = format!("target/aeron/{}/archive", id);
346
347        info!("starting archive media driver");
348        let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
349            &aeron_dir,
350            &format!("{}/archive", aeron_dir),
351            ARCHIVE_CONTROL_REQUEST,
352            ARCHIVE_CONTROL_RESPONSE,
353            ARCHIVE_RECORDING_EVENTS,
354        )
355        .expect("Failed to start embedded media driver");
356
357        info!("connecting to archive");
358        let (archive, aeron) = media_driver
359            .archive_connect()
360            .expect("Could not connect to archive client");
361
362        let running = Arc::new(AtomicBool::new(true));
363
364        info!("connected to archive, adding publication");
365        assert!(!aeron.is_closed());
366
367        let (session_id, publisher_thread) =
368            reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
369
370        {
371            let context = AeronContext::new()?;
372            context.set_dir(&media_driver.aeron_dir)?;
373            let error_handler = Handler::leak(ErrorCount::default());
374            context.set_error_handler(Some(&error_handler))?;
375            let aeron = Aeron::new(&context)?;
376            aeron.start()?;
377            let aeron_archive_context = archive.get_archive_context();
378            let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
379                &aeron,
380                aeron_archive_context.get_control_request_channel(),
381                aeron_archive_context.get_control_response_channel(),
382                aeron_archive_context.get_recording_events_channel(),
383            )?;
384            aeron_archive_context.set_error_handler(Some(&error_handler))?;
385            let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
386                .poll_blocking(Duration::from_secs(30))
387                .expect("failed to connect to archive");
388            replay_merge_subscription(&archive, aeron.clone(), session_id)?;
389        }
390
391        running.store(false, Ordering::Release);
392        publisher_thread.join().unwrap();
393
394        Ok(())
395    }
396
397    fn reply_merge_publisher(
398        archive: &AeronArchive,
399        aeron: Aeron,
400        running: Arc<AtomicBool>,
401    ) -> Result<(i32, JoinHandle<()>), AeronCError> {
402        let publication = aeron.add_publication(
403            // &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536|fc=tagged,g:99901/1,t:5s"),
404            &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
405                .into_c_string(),
406            STREAM_ID,
407            Duration::from_secs(5),
408        )?;
409
410        info!(
411            "publication {} [status={:?}]",
412            publication.channel(),
413            publication.channel_status()
414        );
415        assert_eq!(1, publication.channel_status());
416
417        let session_id = publication.session_id();
418        let recording_channel = format!(
419            // "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}|gtag=99901"
420            "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
421        );
422        info!("recording channel {}", recording_channel);
423        archive.start_recording(
424            &recording_channel.into_c_string(),
425            STREAM_ID,
426            SOURCE_LOCATION_REMOTE,
427            true,
428        )?;
429
430        info!("waiting for publisher to be connected");
431        while !publication.is_connected() {
432            thread::sleep(Duration::from_millis(100));
433        }
434        info!("publisher to be connected");
435        let counters_reader = aeron.counters_reader();
436        let mut caught_up_count = 0;
437        let publisher_thread = thread::spawn(move || {
438            let mut message_count = 0;
439
440            while running.load(Ordering::Acquire) {
441                let message = format!("{}{}", MESSAGE_PREFIX, message_count);
442                while publication.offer(
443                    message.as_bytes(),
444                    Handlers::no_reserved_value_supplier_handler(),
445                ) <= 0
446                {
447                    thread::sleep(Duration::from_millis(10));
448                }
449                message_count += 1;
450                if message_count % 10_000 == 0 {
451                    info!(
452                        "Published {} messages [position={}]",
453                        message_count,
454                        publication.position()
455                    );
456                }
457                // slow down publishing so can catch up
458                if message_count > 10_000 {
459                    // ensure archiver is caught up
460                    while !publication.is_archive_position_with(0) {
461                        thread::sleep(Duration::from_micros(300));
462                    }
463                    caught_up_count += 1;
464                }
465            }
466            assert!(caught_up_count > 0);
467            info!("Publisher thread terminated");
468        });
469        Ok((session_id, publisher_thread))
470    }
471
472    fn replay_merge_subscription(
473        archive: &AeronArchive,
474        aeron: Aeron,
475        session_id: i32,
476    ) -> Result<(), AeronCError> {
477        // let replay_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
478        let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
479        info!("replay channel {:?}", replay_channel);
480
481        let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
482        info!("replay destination {:?}", replay_destination);
483
484        let live_destination =
485            format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
486                .into_c_string();
487        info!("live destination {:?}", live_destination);
488
489        let counters_reader = aeron.counters_reader();
490        let mut counter_id = -1;
491
492        while counter_id < 0 {
493            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
494        }
495        info!(
496            "counter id {} {:?}",
497            counter_id,
498            counters_reader.get_counter_label(counter_id, 1024)
499        );
500        info!(
501            "counter id {} position={:?}",
502            counter_id,
503            counters_reader.get_counter_value(counter_id)
504        );
505
506        // let recording_id = Cell::new(-1);
507        // let start_position = Cell::new(-1);
508
509        // let mut count = 0;
510        // assert!(
511        //     archive.list_recordings_once(&mut count, 0, 1000, |descriptor| {
512        //         info!("Recording descriptor: {:?}", descriptor);
513        //         recording_id.set(descriptor.recording_id);
514        //         start_position.set(descriptor.start_position);
515        //         assert_eq!(descriptor.session_id, session_id);
516        //         assert_eq!(descriptor.stream_id, STREAM_ID);
517        //     })? >= 0
518        // );
519        // assert!(count > 0);
520        // assert!(recording_id.get() >= 0);
521
522        // let record_id = RecordingPos::get_recording_id(&aeron.counters_reader(), counter_id)?;
523        // assert_eq!(recording_id.get(), record_id);
524        //
525        // let recording_id = recording_id.get();
526        // let start_position = start_position.get();
527        let start_position = 0;
528        let recording_id = RecordingPos::get_recording_id_block(
529            &aeron.counters_reader(),
530            counter_id,
531            Duration::from_secs(5),
532        )?;
533
534        let subscribe_channel =
535            format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
536        info!("subscribe channel {:?}", subscribe_channel);
537        let subscription = aeron.add_subscription(
538            &subscribe_channel,
539            STREAM_ID,
540            Handlers::no_available_image_handler(),
541            Handlers::no_unavailable_image_handler(),
542            Duration::from_secs(5),
543        )?;
544
545        let replay_merge = AeronArchiveReplayMerge::new(
546            &subscription,
547            &archive,
548            &replay_channel,
549            &replay_destination,
550            &live_destination,
551            recording_id,
552            start_position,
553            Aeron::epoch_clock(),
554            10_000,
555        )?;
556
557        info!(
558            "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
559            recording_id,
560            start_position,
561            subscribe_channel,
562            &replay_channel,
563            &replay_destination,
564            &live_destination
565        );
566
567        // media_driver
568        //     .run_aeron_stats()
569        //     .expect("Failed to run aeron stats");
570
571        // info!("Waiting for subscription to connect...");
572        // while !subscription.is_connected() {
573        //     thread::sleep(Duration::from_millis(100));
574        // }
575        // info!("Subscription connected");
576
577        info!(
578            "about to start_replay [maxRecordPosition={:?}]",
579            archive.get_max_recorded_position(recording_id)
580        );
581
582        let mut reply_count = 0;
583        while !replay_merge.is_merged() {
584            assert!(!replay_merge.has_failed());
585            if replay_merge.poll_once(
586                |buffer, _header| {
587                    reply_count += 1;
588                    if reply_count % 10_000 == 0 {
589                        info!(
590                            "replay-merge [count={}, isMerged={}, isLive={}]",
591                            reply_count,
592                            replay_merge.is_merged(),
593                            replay_merge.is_live_added()
594                        );
595                    }
596                },
597                100,
598            )? == 0
599            {
600                let err = archive.poll_for_error_response_as_string(4096)?;
601                if !err.is_empty() {
602                    panic!("{}", err);
603                }
604                if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
605                    panic!("{}", Aeron::errmsg());
606                }
607                archive.poll_for_recording_signals()?;
608                thread::sleep(Duration::from_millis(100));
609            }
610        }
611        assert!(!replay_merge.has_failed());
612        assert!(replay_merge.is_live_added());
613        assert!(reply_count > 10_000);
614        Ok(())
615    }
616
617    #[test]
618    fn version_check() {
619        let major = unsafe { crate::aeron_version_major() };
620        let minor = unsafe { crate::aeron_version_minor() };
621        let patch = unsafe { crate::aeron_version_patch() };
622
623        let aeron_version = format!("{}.{}.{}", major, minor, patch);
624
625        let cargo_version = "1.48.10";
626        assert_eq!(aeron_version, cargo_version);
627    }
628
629    use std::thread;
630
631    pub fn start_aeron_archive() -> Result<
632        (
633            Aeron,
634            AeronArchiveContext,
635            EmbeddedArchiveMediaDriverProcess,
636        ),
637        Box<dyn Error>,
638    > {
639        let id = Aeron::nano_clock();
640        let aeron_dir = format!("target/aeron/{}/shm", id);
641        let archive_dir = format!("target/aeron/{}/archive", id);
642
643        let request_port = find_unused_udp_port(8000).expect("Could not find port");
644        let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
645        let recording_event_port =
646            find_unused_udp_port(response_port + 1).expect("Could not find port");
647        let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
648        let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
649        let recording_events_channel =
650            &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
651        assert_ne!(request_control_channel, response_control_channel);
652
653        let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
654            &aeron_dir,
655            &archive_dir,
656            request_control_channel,
657            response_control_channel,
658            recording_events_channel,
659        )
660        .expect("Failed to start Java process");
661
662        let aeron_context = AeronContext::new()?;
663        aeron_context.set_dir(&aeron_dir.into_c_string())?;
664        aeron_context.set_client_name(&"test".into_c_string())?;
665        aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
666            AeronPublicationErrorFrameHandlerLogger,
667        )))?;
668        let error_handler = Handler::leak(ErrorCount::default());
669        aeron_context.set_error_handler(Some(&error_handler))?;
670        let aeron = Aeron::new(&aeron_context)?;
671        aeron.start()?;
672
673        let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
674            &aeron,
675            request_control_channel,
676            response_control_channel,
677            recording_events_channel,
678        )?;
679        archive_context.set_error_handler(Some(&error_handler))?;
680        Ok((aeron, archive_context, archive_media_driver))
681    }
682
683    #[test]
684    #[serial]
685    pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
686        let _ = env_logger::Builder::new()
687            .is_test(true)
688            .filter_level(log::LevelFilter::Info)
689            .try_init();
690        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
691            .expect("failed to kill all java processes");
692
693        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
694
695        assert!(!aeron.is_closed());
696
697        info!("connected to aeron");
698
699        let archive_connector =
700            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
701        let archive = archive_connector
702            .poll_blocking(Duration::from_secs(30))
703            .expect("failed to connect to aeron archive media driver");
704
705        assert!(archive.get_archive_id() > 0);
706
707        let channel = AERON_IPC_STREAM;
708        let stream_id = 10;
709
710        let subscription_id =
711            archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
712
713        assert!(subscription_id >= 0);
714        info!("subscription id {}", subscription_id);
715
716        let publication = aeron
717            .async_add_exclusive_publication(channel, stream_id)?
718            .poll_blocking(Duration::from_secs(5))?;
719
720        for i in 0..11 {
721            while publication.offer(
722                "123456".as_bytes(),
723                Handlers::no_reserved_value_supplier_handler(),
724            ) <= 0
725            {
726                sleep(Duration::from_millis(50));
727                archive.poll_for_recording_signals()?;
728                let err = archive.poll_for_error_response_as_string(4096)?;
729                if !err.is_empty() {
730                    panic!("{}", err);
731                }
732                archive.idle();
733            }
734            info!("sent message {i} [test_aeron_archive]");
735        }
736
737        archive.idle();
738        let session_id = publication.get_constants()?.session_id;
739        info!("publication session id {}", session_id);
740        // since this is single threaded need to make sure it did write to archiver, usually not required in multi-proccess app
741        let stop_position = publication.position();
742        info!(
743            "publication stop position {} [publication={:?}]",
744            stop_position,
745            publication.get_constants()
746        );
747        let counters_reader = aeron.counters_reader();
748        info!("counters reader ready {:?}", counters_reader);
749
750        let mut counter_id = -1;
751
752        let start = Instant::now();
753        while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
754            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
755            info!("counter id {}", counter_id);
756        }
757
758        assert!(counter_id >= 0);
759
760        info!("counter id {counter_id}, session id {session_id}");
761        while counters_reader.get_counter_value(counter_id) < stop_position {
762            info!(
763                "current archive publication stop position {}",
764                counters_reader.get_counter_value(counter_id)
765            );
766            sleep(Duration::from_millis(50));
767        }
768        info!(
769            "found archive publication stop position {}",
770            counters_reader.get_counter_value(counter_id)
771        );
772
773        archive.stop_recording_channel_and_stream(channel, stream_id)?;
774        drop(publication);
775
776        info!("list recordings");
777        let found_recording_id = Cell::new(-1);
778        let start_pos = Cell::new(-1);
779        let end_pos = Cell::new(-1);
780        let start = Instant::now();
781        while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
782            let mut count = 0;
783            archive.list_recordings_for_uri_once(
784                &mut count,
785                0,
786                i32::MAX,
787                channel,
788                stream_id,
789                |d: AeronArchiveRecordingDescriptor| {
790                    assert_eq!(d.stream_id, stream_id);
791                    info!("found recording {:#?}", d);
792                    info!(
793                        "strippedChannel={}, originalChannel={}",
794                        d.stripped_channel(),
795                        d.original_channel()
796                    );
797                    if d.stop_position > d.start_position && d.stop_position > 0 {
798                        found_recording_id.set(d.recording_id);
799                        start_pos.set(d.start_position);
800                        end_pos.set(d.stop_position);
801                    }
802
803                    // verify clone_struct works
804                    let copy = d.clone_struct();
805                    assert_eq!(copy.deref(), d.deref());
806                    assert_eq!(copy.recording_id, d.recording_id);
807                    assert_eq!(copy.control_session_id, d.control_session_id);
808                    assert_eq!(copy.mtu_length, d.mtu_length);
809                    assert_eq!(copy.source_identity_length, d.source_identity_length);
810                },
811            )?;
812            archive.poll_for_recording_signals()?;
813            let err = archive.poll_for_error_response_as_string(4096)?;
814            if !err.is_empty() {
815                panic!("{}", err);
816            }
817        }
818        assert!(start.elapsed() < Duration::from_secs(5));
819        info!("start replay");
820        let params = AeronArchiveReplayParams::new(
821            0,
822            i32::MAX,
823            start_pos.get(),
824            end_pos.get() - start_pos.get(),
825            0,
826            0,
827        )?;
828        info!("replay params {:#?}", params);
829        let replay_stream_id = 45;
830        let replay_session_id =
831            archive.start_replay(found_recording_id.get(), channel, replay_stream_id, &params)?;
832        let session_id = replay_session_id as i32;
833
834        info!("replay session id {}", replay_session_id);
835        info!("session id {}", session_id);
836        let channel_replay =
837            format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
838        info!("archive id: {}", archive.get_archive_id());
839
840        info!("add subscription {:?}", channel_replay);
841        let subscription = aeron
842            .async_add_subscription(
843                &channel_replay,
844                replay_stream_id,
845                Some(&Handler::leak(AeronAvailableImageLogger)),
846                Some(&Handler::leak(AeronUnavailableImageLogger)),
847            )?
848            .poll_blocking(Duration::from_secs(10))?;
849
850        #[derive(Default)]
851        struct FragmentHandler {
852            count: Cell<usize>,
853        }
854
855        impl AeronFragmentHandlerCallback for FragmentHandler {
856            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
857                assert_eq!(buffer, "123456".as_bytes());
858
859                // Update count (using Cell for interior mutability)
860                self.count.set(self.count.get() + 1);
861            }
862        }
863
864        let poll = Handler::leak(FragmentHandler::default());
865
866        let start = Instant::now();
867        while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
868        {
869            let err = archive.poll_for_error_response_as_string(4096)?;
870            if !err.is_empty() {
871                panic!("{}", err);
872            }
873        }
874        assert!(
875            start.elapsed() < Duration::from_secs(10),
876            "messages not received {:?}",
877            poll.count
878        );
879        info!("aeron {:?}", aeron);
880        info!("ctx {:?}", archive_context);
881        assert_eq!(11, poll.count.get());
882        Ok(())
883    }
884
885    #[test]
886    #[serial]
887    fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
888        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
889        let archive_connector =
890            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
891        let archive = archive_connector
892            .poll_blocking(Duration::from_secs(30))
893            .expect("failed to connect to archive");
894
895        let invalid_channel = "invalid:channel".into_c_string();
896        let result =
897            archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
898        assert!(
899            result.is_err(),
900            "Expected error when starting recording with an invalid channel"
901        );
902        Ok(())
903    }
904
905    #[test]
906    #[serial]
907    fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
908        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
909        let archive_connector =
910            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
911        let archive = archive_connector
912            .poll_blocking(Duration::from_secs(30))
913            .expect("failed to connect to archive");
914
915        let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
916        let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
917        assert!(
918            result.is_err(),
919            "Expected error when stopping recording on a non-existent channel"
920        );
921        Ok(())
922    }
923
924    #[test]
925    #[serial]
926    fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
927        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
928        let archive_connector =
929            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
930        let archive = archive_connector
931            .poll_blocking(Duration::from_secs(30))
932            .expect("failed to connect to archive");
933
934        let invalid_recording_id = -999;
935        let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
936        let result = archive.start_replay(
937            invalid_recording_id,
938            &"aeron:udp?endpoint=localhost:8888".into_c_string(),
939            STREAM_ID,
940            &params,
941        );
942        assert!(
943            result.is_err(),
944            "Expected error when starting replay with an invalid recording id"
945        );
946        Ok(())
947    }
948
949    #[test]
950    #[serial]
951    fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
952        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
953        let archive_connector =
954            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
955        let archive = archive_connector
956            .poll_blocking(Duration::from_secs(30))
957            .expect("failed to connect to archive");
958
959        drop(archive);
960
961        let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
962        let new_archive = archive_connector
963            .poll_blocking(Duration::from_secs(30))
964            .expect("failed to reconnect to archive");
965        assert!(
966            new_archive.get_archive_id() > 0,
967            "Reconnected archive should have a valid archive id"
968        );
969
970        drop(media_driver);
971        Ok(())
972    }
973}
974
975// run `just slow-tests`
976#[cfg(test)]
977mod slow_consumer_test;