rusteron_archive/
testing.rs

1use crate::IntoCString;
2use crate::{
3    Aeron, AeronArchive, AeronArchiveAsyncConnect, AeronArchiveContext, AeronContext, Handler,
4    NoOpAeronIdleStrategyFunc,
5};
6use log::info;
7use log::{error, warn};
8use regex::Regex;
9use std::backtrace::Backtrace;
10use std::ffi::CString;
11use std::path::Path;
12use std::process::{Child, Command, ExitStatus, Stdio};
13use std::thread::sleep;
14use std::time::{Duration, Instant};
15use std::{fs, io, panic, process};
16
17pub struct EmbeddedArchiveMediaDriverProcess {
18    child: Child,
19    pub aeron_dir: CString,
20    pub archive_dir: CString,
21    pub control_request_channel: String,
22    pub control_response_channel: String,
23    pub recording_events_channel: String,
24}
25
26impl EmbeddedArchiveMediaDriverProcess {
27    /// Builds the Aeron Archive project and starts an embedded Aeron Archive Media Driver process.
28    ///
29    /// This function ensures that the necessary Aeron `.jar` files are built using Gradle. If the required
30    /// `.jar` files are not found in the expected directory, it runs the Gradle build tasks to generate them.
31    /// Once the build is complete, it invokes the `start` function to initialize and run the Aeron Archive Media Driver.
32    ///
33    /// # Parameters
34    /// - `aeron_dir`: The directory for the Aeron media driver to use for its IPC mechanisms.
35    /// - `archive_dir`: The directory where the Aeron Archive will store its recordings and metadata.
36    /// - `control_request_channel`: The channel URI used for sending control requests to the Aeron Archive.
37    /// - `control_response_channel`: The channel URI used for receiving control responses from the Aeron Archive.
38    /// - `recording_events_channel`: The channel URI used for receiving recording event notifications from the Aeron Archive.
39    ///
40    /// # Returns
41    /// On success, returns an instance of `EmbeddedArchiveMediaDriverProcess` encapsulating the child process
42    /// and configuration used. Returns an `io::Result` if the process fails to start or the build fails.
43    ///
44    /// # Errors
45    /// Returns an `io::Result::Err` if:
46    /// - The Gradle build fails to execute or complete.
47    /// - The required `.jar` files are still not found after building.
48    /// - The `start` function encounters an error starting the process.
49    ///
50    /// # Example
51    /// ```
52    /// use rusteron_archive::testing::EmbeddedArchiveMediaDriverProcess;
53    /// let driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
54    ///     "/tmp/aeron-dir",
55    ///     "/tmp/archive-dir",
56    ///     "aeron:udp?endpoint=localhost:8010",
57    ///     "aeron:udp?endpoint=localhost:8011",
58    ///     "aeron:udp?endpoint=localhost:8012",
59    /// ).expect("Failed to build and start Aeron Archive Media Driver");
60    /// ```
61    ///
62    /// # Notes
63    /// - This function assumes the presence of a Gradle wrapper script (`gradlew` or `gradlew.bat`)
64    ///   in the `aeron` directory relative to the project's root (`CARGO_MANIFEST_DIR`).
65    /// - The required `.jar` files will be generated in `aeron/aeron-all/build/libs` if not already present.
66    /// - The `build_and_start` function is a convenience wrapper for automating the build and initialization process.
67    pub fn build_and_start(
68        aeron_dir: &str,
69        archive_dir: &str,
70        control_request_channel: &str,
71        control_response_channel: &str,
72        recording_events_channel: &str,
73    ) -> io::Result<Self> {
74        let path = std::path::MAIN_SEPARATOR;
75        let gradle = if cfg!(target_os = "windows") {
76            &format!("{}{path}aeron{path}gradlew.bat", env!("CARGO_MANIFEST_DIR"),)
77        } else {
78            "./gradlew"
79        };
80        let dir = format!("{}{path}aeron", env!("CARGO_MANIFEST_DIR"),);
81        info!("running {} in {}", gradle, dir);
82
83        if !Path::new(&format!(
84            "{}{path}aeron{path}aeron-all{path}build{path}libs",
85            env!("CARGO_MANIFEST_DIR")
86        ))
87        .exists()
88        {
89            Command::new(&gradle)
90                .current_dir(dir)
91                .args([
92                    ":aeron-agent:jar",
93                    ":aeron-samples:jar",
94                    ":aeron-archive:jar",
95                    ":aeron-all:build",
96                ])
97                .stdout(Stdio::inherit())
98                .stderr(Stdio::inherit())
99                .spawn()?
100                .wait()?;
101        }
102
103        return Self::start(
104            &aeron_dir,
105            archive_dir,
106            control_request_channel,
107            control_response_channel,
108            recording_events_channel,
109        );
110    }
111
112    pub fn run_aeron_stats(&self) -> std::io::Result<Child> {
113        let main_dir = env!("CARGO_MANIFEST_DIR");
114        let dir = format!("{}/{}", main_dir, &self.aeron_dir.to_str().unwrap());
115        info!("running 'just aeron-stat {}'", dir);
116        Command::new("just")
117            .args(["aeron-stat", dir.as_str()])
118            .stdout(Stdio::inherit())
119            .stderr(Stdio::inherit())
120            .spawn()
121    }
122
123    pub fn archive_connect(&self) -> Result<(AeronArchive, Aeron), io::Error> {
124        let start = Instant::now();
125        while start.elapsed() < Duration::from_secs(30) {
126            if let Ok(aeron_context) = AeronContext::new() {
127                aeron_context.set_dir(&self.aeron_dir).expect("invalid dir");
128                aeron_context
129                    .set_client_name(&CString::new("unit_test_client")?)
130                    .expect("invalid client name");
131                if let Ok(aeron) = Aeron::new(&aeron_context) {
132                    if aeron.start().is_ok() {
133                        if let Ok(archive_context) =
134                            AeronArchiveContext::new_with_no_credentials_supplier(
135                                &aeron,
136                                &self.control_request_channel,
137                                &self.control_response_channel,
138                                &self.recording_events_channel,
139                            )
140                        {
141                            archive_context
142                                .set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))
143                                .expect("unable to set idle strategy");
144                            if let Ok(connect) =
145                                AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)
146                            {
147                                if let Ok(archive) = connect.poll_blocking(Duration::from_secs(10))
148                                {
149                                    let i = archive.get_archive_id();
150                                    assert!(i > 0);
151                                    info!("aeron archive media driver is up [connected with archive id {i}]");
152                                    sleep(Duration::from_millis(100));
153                                    return Ok((archive, aeron));
154                                };
155                            }
156                        }
157                        error!("aeron error: {}", Aeron::errmsg());
158                    }
159                }
160            }
161            info!("waiting for aeron to start up, retrying...");
162        }
163
164        assert!(
165            start.elapsed() < Duration::from_secs(30),
166            "failed to start up aeron media driver"
167        );
168
169        return Err(std::io::Error::other(
170            "unable to start up aeron media driver client",
171        ));
172    }
173
174    /// Starts an embedded Aeron Archive Media Driver process with the specified configurations.
175    ///
176    /// This function cleans and recreates the Aeron and archive directories, configures the JVM to run
177    /// the Aeron Archive Media Driver, and starts the process with the specified control channels.
178    /// It ensures that the environment is correctly prepared for Aeron communication.
179    ///
180    /// # Parameters
181    /// - `aeron_dir`: The directory for the Aeron media driver to use for its IPC mechanisms.
182    /// - `archive_dir`: The directory where the Aeron Archive will store its recordings and metadata.
183    /// - `control_request_channel`: The channel URI used for sending control requests to the Aeron Archive.
184    /// - `control_response_channel`: The channel URI used for receiving control responses from the Aeron Archive.
185    /// - `recording_events_channel`: The channel URI used for receiving recording event notifications from the Aeron Archive.
186    ///
187    /// # Returns
188    /// On success, returns an instance of `EmbeddedArchiveMediaDriverProcess` encapsulating the child process
189    /// and configuration used. Returns an `io::Result` if the process fails to start.
190    ///
191    /// # Errors
192    /// Returns an `io::Result::Err` if:
193    /// - Cleaning or creating the directories fails.
194    /// - The required `.jar` files are missing or not found.
195    /// - The Java process fails to start.
196    ///
197    /// # Example
198    /// ```
199    /// use rusteron_archive::testing::EmbeddedArchiveMediaDriverProcess;
200    /// let driver = EmbeddedArchiveMediaDriverProcess::start(
201    ///     "/tmp/aeron-dir",
202    ///     "/tmp/archive-dir",
203    ///     "aeron:udp?endpoint=localhost:8010",
204    ///     "aeron:udp?endpoint=localhost:8011",
205    ///     "aeron:udp?endpoint=localhost:8012",
206    /// ).expect("Failed to start Aeron Archive Media Driver");
207    /// ```
208    ///
209    /// # Notes
210    /// - The Aeron `.jar` files must be available under the directory `aeron/aeron-all/build/libs` relative
211    ///   to the project's root (`CARGO_MANIFEST_DIR`).
212    /// - The function configures the JVM with properties for Aeron, such as enabling event logging and disabling bounds checks.
213    pub fn start(
214        aeron_dir: &str,
215        archive_dir: &str,
216        control_request_channel: &str,
217        control_response_channel: &str,
218        recording_events_channel: &str,
219    ) -> io::Result<Self> {
220        Self::clean_directory(aeron_dir)?;
221        Self::clean_directory(archive_dir)?;
222
223        // Ensure directories are recreated
224        fs::create_dir_all(aeron_dir)?;
225        fs::create_dir_all(archive_dir)?;
226
227        let binding = fs::read_dir(format!(
228            "{}/aeron/aeron-all/build/libs",
229            env!("CARGO_MANIFEST_DIR")
230        ))?
231        .filter(|f| f.is_ok())
232        .map(|f| f.unwrap())
233        .filter(|f| {
234            f.file_name()
235                .to_string_lossy()
236                .to_string()
237                .ends_with(".jar")
238        })
239        .next()
240        .unwrap()
241        .path();
242        let mut jar_path = binding.to_str().unwrap();
243        let mut agent_jar = jar_path.replace("aeron-all", "aeron-agent");
244
245        assert!(fs::exists(jar_path).unwrap_or_default());
246        if fs::exists(&agent_jar).unwrap_or_default() {
247            agent_jar = format!("-javaagent:{}", agent_jar);
248        } else {
249            agent_jar = " ".to_string();
250        }
251        let separator = if cfg!(target_os = "windows") {
252            ";"
253        } else {
254            ":"
255        };
256
257        let combined_jars = format!(
258            "{}{separator}{}",
259            jar_path,
260            jar_path.replace("aeron-all", "aeron-archive")
261        );
262        jar_path = &combined_jars;
263
264        let args = [
265            agent_jar.as_str(),
266            "--add-opens",
267            "java.base/jdk.internal.misc=ALL-UNNAMED",
268            "-cp",
269            jar_path,
270            &format!("-Daeron.dir={}", aeron_dir),
271            &format!("-Daeron.archive.dir={}", archive_dir),
272            "-Daeron.spies.simulate.connection=true",
273            // "-Daeron.event.log=admin", // this will only work if an agent is built
274            "-Daeron.event.log=all", // this will only work if an agent is built
275            "-Daeron.event.log.disable=FRAME_IN,FRAME_OUT", // this will only work if an agent is built
276            "-Daeron.event.archive.log=all",
277            "-Daeron.event.cluster.log=all",
278            // "-Daeron.term.buffer.sparse.file=false",
279            // "-Daeron.pre.touch.mapped.memory=true",
280            // "-Daeron.threading.mode=DEDICATED",
281            // "-Daeron.sender.idle.strategy=noop",
282            // "-Daeron.receiver.idle.strategy=noop",
283            // "-Daeron.conductor.idle.strategy=spin",
284            "-Dagrona.disable.bounds.checks=true",
285            &format!(
286                "-Daeron.archive.control.channel={}",
287                control_request_channel
288            ),
289            &format!(
290                "-Daeron.archive.control.response.channel={}",
291                control_response_channel
292            ),
293            &format!(
294                "-Daeron.archive.recording.events.channel={}",
295                recording_events_channel
296            ),
297            "-Daeron.archive.replication.channel=aeron:udp?endpoint=localhost:0",
298            "io.aeron.archive.ArchivingMediaDriver",
299        ];
300
301        info!(
302            "starting archive media driver [\n\tjava {}\n]",
303            args.join(" ")
304        );
305
306        let child = Command::new("java")
307            .args(args)
308            .stdout(Stdio::inherit())
309            .stderr(Stdio::inherit())
310            .spawn()?;
311
312        info!(
313            "started archive media driver [{:?}",
314            fs::read_dir(aeron_dir)?.collect::<Vec<_>>()
315        );
316
317        Ok(EmbeddedArchiveMediaDriverProcess {
318            child,
319            aeron_dir: aeron_dir.into_c_string(),
320            archive_dir: archive_dir.into_c_string(),
321            control_request_channel: control_request_channel.to_string(),
322            control_response_channel: control_response_channel.to_string(),
323            recording_events_channel: recording_events_channel.to_string(),
324        })
325    }
326
327    fn clean_directory(dir: &str) -> io::Result<()> {
328        info!("cleaning directory {}", dir);
329        let path = Path::new(dir);
330        if path.exists() {
331            fs::remove_dir_all(path)?;
332        }
333        Ok(())
334    }
335
336    pub fn kill_all_java_processes() -> io::Result<ExitStatus> {
337        if cfg!(not(target_os = "windows")) {
338            return Ok(std::process::Command::new("pkill")
339                .args(["-9", "java"])
340                .stdout(Stdio::inherit())
341                .stderr(Stdio::inherit())
342                .spawn()?
343                .wait()?);
344        }
345        Ok(ExitStatus::default())
346    }
347}
348
349// Use the Drop trait to ensure process cleanup and directory removal after test completion
350impl Drop for EmbeddedArchiveMediaDriverProcess {
351    fn drop(&mut self) {
352        warn!("WARN: stopping aeron archive media driver!!!!");
353        // Attempt to kill the Java process if it’s still running
354        if let Err(e) = self.child.kill() {
355            error!("Failed to kill Java process: {}", e);
356        }
357
358        // Clean up directories after the process has terminated
359        if let Err(e) = Self::clean_directory(&self.aeron_dir.to_str().unwrap()) {
360            error!("Failed to clean up Aeron directory: {}", e);
361        }
362        if let Err(e) = Self::clean_directory(&self.archive_dir.to_str().unwrap()) {
363            error!("Failed to clean up Archive directory: {}", e);
364        }
365    }
366}
367
368pub fn set_panic_hook() {
369    panic::set_hook(Box::new(|info| {
370        // Get the backtrace
371        let backtrace = Backtrace::force_capture();
372        error!("Stack trace: {backtrace:#?}");
373
374        let backtrace = format!("{:?}", backtrace);
375        // Regular expression to match the function, file, and line
376        let re = Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
377
378        // Extract and print in IntelliJ format with function
379        for cap in re.captures_iter(&backtrace) {
380            let function = &cap[1];
381            let file = &cap[2];
382            let line = &cap[3];
383            info!("{file}:{line} in {function}");
384        }
385
386        error!("Panic occurred: {:#?}", info);
387
388        if let Some(payload) = info.payload().downcast_ref::<&str>() {
389            error!("Panic message: {}", payload);
390        } else if let Some(payload) = info.payload().downcast_ref::<String>() {
391            error!("Panic message: {}", payload);
392        } else {
393            // If it's not a &str or String, try to print it as Debug
394            error!(
395                "Panic with non-standard payload: {:?}",
396                info.payload().type_id()
397            );
398        }
399
400        warn!("shutdown");
401
402        process::abort();
403    }))
404}