rusteron_archive/
testing.rs

1use crate::IntoCString;
2use crate::{Aeron, AeronArchive, AeronArchiveAsyncConnect, AeronArchiveContext, AeronContext};
3use log::info;
4use log::{error, warn};
5use regex::Regex;
6use std::backtrace::Backtrace;
7use std::ffi::CString;
8use std::path::Path;
9use std::process::{Child, Command, ExitStatus, Stdio};
10use std::thread::sleep;
11use std::time::{Duration, Instant};
12use std::{fs, io, panic, process};
13
14pub struct EmbeddedArchiveMediaDriverProcess {
15    child: Child,
16    pub aeron_dir: CString,
17    pub archive_dir: CString,
18    pub control_request_channel: String,
19    pub control_response_channel: String,
20    pub recording_events_channel: String,
21}
22
23impl EmbeddedArchiveMediaDriverProcess {
24    /// Builds the Aeron Archive project and starts an embedded Aeron Archive Media Driver process.
25    ///
26    /// This function ensures that the necessary Aeron `.jar` files are built using Gradle. If the required
27    /// `.jar` files are not found in the expected directory, it runs the Gradle build tasks to generate them.
28    /// Once the build is complete, it invokes the `start` function to initialize and run the Aeron Archive Media Driver.
29    ///
30    /// # Parameters
31    /// - `aeron_dir`: The directory for the Aeron media driver to use for its IPC mechanisms.
32    /// - `archive_dir`: The directory where the Aeron Archive will store its recordings and metadata.
33    /// - `control_request_channel`: The channel URI used for sending control requests to the Aeron Archive.
34    /// - `control_response_channel`: The channel URI used for receiving control responses from the Aeron Archive.
35    /// - `recording_events_channel`: The channel URI used for receiving recording event notifications from the Aeron Archive.
36    ///
37    /// # Returns
38    /// On success, returns an instance of `EmbeddedArchiveMediaDriverProcess` encapsulating the child process
39    /// and configuration used. Returns an `io::Result` if the process fails to start or the build fails.
40    ///
41    /// # Errors
42    /// Returns an `io::Result::Err` if:
43    /// - The Gradle build fails to execute or complete.
44    /// - The required `.jar` files are still not found after building.
45    /// - The `start` function encounters an error starting the process.
46    ///
47    /// # Example
48    /// ```
49    /// use rusteron_archive::testing::EmbeddedArchiveMediaDriverProcess;
50    /// let driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
51    ///     "/tmp/aeron-dir",
52    ///     "/tmp/archive-dir",
53    ///     "aeron:udp?endpoint=localhost:8010",
54    ///     "aeron:udp?endpoint=localhost:8011",
55    ///     "aeron:udp?endpoint=localhost:8012",
56    /// ).expect("Failed to build and start Aeron Archive Media Driver");
57    /// ```
58    ///
59    /// # Notes
60    /// - This function assumes the presence of a Gradle wrapper script (`gradlew` or `gradlew.bat`)
61    ///   in the `aeron` directory relative to the project's root (`CARGO_MANIFEST_DIR`).
62    /// - The required `.jar` files will be generated in `aeron/aeron-all/build/libs` if not already present.
63    /// - The `build_and_start` function is a convenience wrapper for automating the build and initialization process.
64    pub fn build_and_start(
65        aeron_dir: &str,
66        archive_dir: &str,
67        control_request_channel: &str,
68        control_response_channel: &str,
69        recording_events_channel: &str,
70    ) -> io::Result<Self> {
71        let path = std::path::MAIN_SEPARATOR;
72        let gradle = if cfg!(target_os = "windows") {
73            &format!("{}{path}aeron{path}gradlew.bat", env!("CARGO_MANIFEST_DIR"),)
74        } else {
75            "./gradlew"
76        };
77        let dir = format!("{}{path}aeron", env!("CARGO_MANIFEST_DIR"),);
78        info!("running {} in {}", gradle, dir);
79
80        if !Path::new(&format!(
81            "{}{path}aeron{path}aeron-all{path}build{path}libs",
82            env!("CARGO_MANIFEST_DIR")
83        ))
84        .exists()
85        {
86            Command::new(&gradle)
87                .current_dir(dir)
88                .args([
89                    ":aeron-agent:jar",
90                    ":aeron-samples:jar",
91                    ":aeron-archive:jar",
92                    ":aeron-all:build",
93                ])
94                .stdout(Stdio::inherit())
95                .stderr(Stdio::inherit())
96                .spawn()?
97                .wait()?;
98        }
99
100        return Self::start(
101            &aeron_dir,
102            archive_dir,
103            control_request_channel,
104            control_response_channel,
105            recording_events_channel,
106        );
107    }
108
109    pub fn run_aeron_stats(&self) -> std::io::Result<Child> {
110        let main_dir = env!("CARGO_MANIFEST_DIR");
111        let dir = format!("{}/{}", main_dir, &self.aeron_dir.to_str().unwrap());
112        info!("running 'just aeron-stat {}'", dir);
113        Command::new("just")
114            .args(["aeron-stat", dir.as_str()])
115            .stdout(Stdio::inherit())
116            .stderr(Stdio::inherit())
117            .spawn()
118    }
119
120    pub fn archive_connect(&self) -> Result<(AeronArchive, Aeron), io::Error> {
121        let start = Instant::now();
122        while start.elapsed() < Duration::from_secs(30) {
123            if let Ok(aeron_context) = AeronContext::new() {
124                aeron_context.set_dir(&self.aeron_dir).expect("invalid dir");
125                aeron_context
126                    .set_client_name(&CString::new("unit_test_client")?)
127                    .expect("invalid client name");
128                if let Ok(aeron) = Aeron::new(&aeron_context) {
129                    if aeron.start().is_ok() {
130                        if let Ok(archive_context) =
131                            AeronArchiveContext::new_with_no_credentials_supplier(
132                                &aeron,
133                                &self.control_request_channel,
134                                &self.control_response_channel,
135                                &self.recording_events_channel,
136                            )
137                        {
138                            if let Ok(connect) =
139                                AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)
140                            {
141                                if let Ok(archive) = connect.poll_blocking(Duration::from_secs(10))
142                                {
143                                    let i = archive.get_archive_id();
144                                    assert!(i > 0);
145                                    info!("aeron archive media driver is up [connected with archive id {i}]");
146                                    sleep(Duration::from_millis(100));
147                                    return Ok((archive, aeron));
148                                };
149                            }
150                        }
151                        error!("aeron error: {}", Aeron::errmsg());
152                    }
153                }
154            }
155            info!("waiting for aeron to start up, retrying...");
156        }
157
158        assert!(
159            start.elapsed() < Duration::from_secs(30),
160            "failed to start up aeron media driver"
161        );
162
163        return Err(std::io::Error::other(
164            "unable to start up aeron media driver client",
165        ));
166    }
167
168    /// Starts an embedded Aeron Archive Media Driver process with the specified configurations.
169    ///
170    /// This function cleans and recreates the Aeron and archive directories, configures the JVM to run
171    /// the Aeron Archive Media Driver, and starts the process with the specified control channels.
172    /// It ensures that the environment is correctly prepared for Aeron communication.
173    ///
174    /// # Parameters
175    /// - `aeron_dir`: The directory for the Aeron media driver to use for its IPC mechanisms.
176    /// - `archive_dir`: The directory where the Aeron Archive will store its recordings and metadata.
177    /// - `control_request_channel`: The channel URI used for sending control requests to the Aeron Archive.
178    /// - `control_response_channel`: The channel URI used for receiving control responses from the Aeron Archive.
179    /// - `recording_events_channel`: The channel URI used for receiving recording event notifications from the Aeron Archive.
180    ///
181    /// # Returns
182    /// On success, returns an instance of `EmbeddedArchiveMediaDriverProcess` encapsulating the child process
183    /// and configuration used. Returns an `io::Result` if the process fails to start.
184    ///
185    /// # Errors
186    /// Returns an `io::Result::Err` if:
187    /// - Cleaning or creating the directories fails.
188    /// - The required `.jar` files are missing or not found.
189    /// - The Java process fails to start.
190    ///
191    /// # Example
192    /// ```
193    /// use rusteron_archive::testing::EmbeddedArchiveMediaDriverProcess;
194    /// let driver = EmbeddedArchiveMediaDriverProcess::start(
195    ///     "/tmp/aeron-dir",
196    ///     "/tmp/archive-dir",
197    ///     "aeron:udp?endpoint=localhost:8010",
198    ///     "aeron:udp?endpoint=localhost:8011",
199    ///     "aeron:udp?endpoint=localhost:8012",
200    /// ).expect("Failed to start Aeron Archive Media Driver");
201    /// ```
202    ///
203    /// # Notes
204    /// - The Aeron `.jar` files must be available under the directory `aeron/aeron-all/build/libs` relative
205    ///   to the project's root (`CARGO_MANIFEST_DIR`).
206    /// - The function configures the JVM with properties for Aeron, such as enabling event logging and disabling bounds checks.
207    pub fn start(
208        aeron_dir: &str,
209        archive_dir: &str,
210        control_request_channel: &str,
211        control_response_channel: &str,
212        recording_events_channel: &str,
213    ) -> io::Result<Self> {
214        Self::clean_directory(aeron_dir)?;
215        Self::clean_directory(archive_dir)?;
216
217        // Ensure directories are recreated
218        fs::create_dir_all(aeron_dir)?;
219        fs::create_dir_all(archive_dir)?;
220
221        let binding = fs::read_dir(format!(
222            "{}/aeron/aeron-all/build/libs",
223            env!("CARGO_MANIFEST_DIR")
224        ))?
225        .filter(|f| f.is_ok())
226        .map(|f| f.unwrap())
227        .filter(|f| {
228            f.file_name()
229                .to_string_lossy()
230                .to_string()
231                .ends_with(".jar")
232        })
233        .next()
234        .unwrap()
235        .path();
236        let mut jar_path = binding.to_str().unwrap();
237        let agent_jar = jar_path.replace("aeron-all", "aeron-agent");
238
239        assert!(fs::exists(jar_path).unwrap_or_default());
240        let mut args = vec![];
241
242        if fs::exists(&agent_jar).unwrap_or_default() {
243            args.push(format!("-javaagent:{}", agent_jar));
244        }
245        let separator = if cfg!(target_os = "windows") {
246            ";"
247        } else {
248            ":"
249        };
250
251        let combined_jars = format!(
252            "{}{separator}{}",
253            jar_path,
254            jar_path.replace("aeron-all", "aeron-archive")
255        );
256        jar_path = &combined_jars;
257
258        args.push("--add-opens".to_string());
259        args.push("java.base/jdk.internal.misc=ALL-UNNAMED".to_string());
260        args.push("-cp".to_string());
261        args.push(jar_path.to_string());
262        args.push(format!("-Daeron.dir={}", aeron_dir));
263        args.push(format!("-Daeron.archive.dir={}", archive_dir));
264        args.push("-Daeron.spies.simulate.connection=true".to_string());
265        args.push("-Daeron.event.log=all".to_string());
266        args.push("-Daeron.event.log.disable=FRAME_IN,FRAME_OUT".to_string());
267        args.push("-Daeron.event.archive.log=all".to_string());
268        args.push("-Daeron.event.cluster.log=all".to_string());
269        args.push("-Dagrona.disable.bounds.checks=true".to_string());
270        args.push(format!(
271            "-Daeron.archive.control.channel={}",
272            control_request_channel
273        ));
274        args.push(format!(
275            "-Daeron.archive.control.response.channel={}",
276            control_response_channel
277        ));
278        args.push(format!(
279            "-Daeron.archive.recording.events.channel={}",
280            recording_events_channel
281        ));
282        args.push("-Daeron.archive.replication.channel=aeron:udp?endpoint=localhost:0".to_string());
283        args.push("-Daeron.client.liveness.timeout=60000000000".to_string());
284        args.push("-Daeron.image.liveness.timeout=60000000000".to_string());
285        args.push("-Daeron.publication.unblock.timeout=65000000000".to_string());
286        args.push("io.aeron.archive.ArchivingMediaDriver".to_string());
287
288        info!(
289            "starting archive media driver [\n\tjava {}\n]",
290            args.join(" ")
291        );
292
293        let child = Command::new("java")
294            .args(args)
295            .stdout(Stdio::inherit())
296            .stderr(Stdio::inherit())
297            .spawn()?;
298
299        info!(
300            "started archive media driver [{:?}",
301            fs::read_dir(aeron_dir)?.collect::<Vec<_>>()
302        );
303
304        Ok(EmbeddedArchiveMediaDriverProcess {
305            child,
306            aeron_dir: aeron_dir.into_c_string(),
307            archive_dir: archive_dir.into_c_string(),
308            control_request_channel: control_request_channel.to_string(),
309            control_response_channel: control_response_channel.to_string(),
310            recording_events_channel: recording_events_channel.to_string(),
311        })
312    }
313
314    fn clean_directory(dir: &str) -> io::Result<()> {
315        info!("cleaning directory {}", dir);
316        let path = Path::new(dir);
317        if path.exists() {
318            fs::remove_dir_all(path)?;
319        }
320        Ok(())
321    }
322
323    pub fn kill_all_java_processes() -> io::Result<ExitStatus> {
324        if cfg!(not(target_os = "windows")) {
325            return Ok(std::process::Command::new("pkill")
326                .args(["-9", "java"])
327                .stdout(Stdio::inherit())
328                .stderr(Stdio::inherit())
329                .spawn()?
330                .wait()?);
331        }
332        Ok(ExitStatus::default())
333    }
334}
335
336// Use the Drop trait to ensure process cleanup and directory removal after test completion
337impl Drop for EmbeddedArchiveMediaDriverProcess {
338    fn drop(&mut self) {
339        warn!("WARN: stopping aeron archive media driver!!!!");
340        // Attempt to kill the Java process if it’s still running
341        if let Err(e) = self.child.kill() {
342            error!("Failed to kill Java process: {}", e);
343        }
344
345        // Clean up directories after the process has terminated
346        if let Err(e) = Self::clean_directory(&self.aeron_dir.to_str().unwrap()) {
347            error!("Failed to clean up Aeron directory: {}", e);
348        }
349        if let Err(e) = Self::clean_directory(&self.archive_dir.to_str().unwrap()) {
350            error!("Failed to clean up Archive directory: {}", e);
351        }
352    }
353}
354
355pub fn set_panic_hook() {
356    panic::set_hook(Box::new(|info| {
357        // Get the backtrace
358        let backtrace = Backtrace::force_capture();
359        error!("Stack trace: {backtrace:#?}");
360
361        let backtrace = format!("{:?}", backtrace);
362        // Regular expression to match the function, file, and line
363        let re = Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
364
365        // Extract and print in IntelliJ format with function
366        for cap in re.captures_iter(&backtrace) {
367            let function = &cap[1];
368            let file = &cap[2];
369            let line = &cap[3];
370            info!("{file}:{line} in {function}");
371        }
372
373        error!("Panic occurred: {:#?}", info);
374
375        if let Some(payload) = info.payload().downcast_ref::<&str>() {
376            error!("Panic message: {}", payload);
377        } else if let Some(payload) = info.payload().downcast_ref::<String>() {
378            error!("Panic message: {}", payload);
379        } else {
380            // If it's not a &str or String, try to print it as Debug
381            error!(
382                "Panic with non-standard payload: {:?}",
383                info.payload().type_id()
384            );
385        }
386
387        warn!("shutdown");
388
389        process::abort();
390    }))
391}