rusteron_archive/testing.rs
1use crate::IntoCString;
2use crate::{
3 Aeron, AeronArchive, AeronArchiveAsyncConnect, AeronArchiveContext, AeronContext, Handler,
4 NoOpAeronIdleStrategyFunc,
5};
6use log::info;
7use log::{error, warn};
8use regex::Regex;
9use std::backtrace::Backtrace;
10use std::ffi::CString;
11use std::path::Path;
12use std::process::{Child, Command, ExitStatus, Stdio};
13use std::thread::sleep;
14use std::time::{Duration, Instant};
15use std::{fs, io, panic, process};
16
17pub struct EmbeddedArchiveMediaDriverProcess {
18 child: Child,
19 pub aeron_dir: CString,
20 pub archive_dir: CString,
21 pub control_request_channel: String,
22 pub control_response_channel: String,
23 pub recording_events_channel: String,
24}
25
26impl EmbeddedArchiveMediaDriverProcess {
27 /// Builds the Aeron Archive project and starts an embedded Aeron Archive Media Driver process.
28 ///
29 /// This function ensures that the necessary Aeron `.jar` files are built using Gradle. If the required
30 /// `.jar` files are not found in the expected directory, it runs the Gradle build tasks to generate them.
31 /// Once the build is complete, it invokes the `start` function to initialize and run the Aeron Archive Media Driver.
32 ///
33 /// # Parameters
34 /// - `aeron_dir`: The directory for the Aeron media driver to use for its IPC mechanisms.
35 /// - `archive_dir`: The directory where the Aeron Archive will store its recordings and metadata.
36 /// - `control_request_channel`: The channel URI used for sending control requests to the Aeron Archive.
37 /// - `control_response_channel`: The channel URI used for receiving control responses from the Aeron Archive.
38 /// - `recording_events_channel`: The channel URI used for receiving recording event notifications from the Aeron Archive.
39 ///
40 /// # Returns
41 /// On success, returns an instance of `EmbeddedArchiveMediaDriverProcess` encapsulating the child process
42 /// and configuration used. Returns an `io::Result` if the process fails to start or the build fails.
43 ///
44 /// # Errors
45 /// Returns an `io::Result::Err` if:
46 /// - The Gradle build fails to execute or complete.
47 /// - The required `.jar` files are still not found after building.
48 /// - The `start` function encounters an error starting the process.
49 ///
50 /// # Example
51 /// ```
52 /// use rusteron_archive::testing::EmbeddedArchiveMediaDriverProcess;
53 /// let driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
54 /// "/tmp/aeron-dir",
55 /// "/tmp/archive-dir",
56 /// "aeron:udp?endpoint=localhost:8010",
57 /// "aeron:udp?endpoint=localhost:8011",
58 /// "aeron:udp?endpoint=localhost:8012",
59 /// ).expect("Failed to build and start Aeron Archive Media Driver");
60 /// ```
61 ///
62 /// # Notes
63 /// - This function assumes the presence of a Gradle wrapper script (`gradlew` or `gradlew.bat`)
64 /// in the `aeron` directory relative to the project's root (`CARGO_MANIFEST_DIR`).
65 /// - The required `.jar` files will be generated in `aeron/aeron-all/build/libs` if not already present.
66 /// - The `build_and_start` function is a convenience wrapper for automating the build and initialization process.
67 pub fn build_and_start(
68 aeron_dir: &str,
69 archive_dir: &str,
70 control_request_channel: &str,
71 control_response_channel: &str,
72 recording_events_channel: &str,
73 ) -> io::Result<Self> {
74 let path = std::path::MAIN_SEPARATOR;
75 let gradle = if cfg!(target_os = "windows") {
76 &format!("{}{path}aeron{path}gradlew.bat", env!("CARGO_MANIFEST_DIR"),)
77 } else {
78 "./gradlew"
79 };
80 let dir = format!("{}{path}aeron", env!("CARGO_MANIFEST_DIR"),);
81 info!("running {} in {}", gradle, dir);
82
83 if !Path::new(&format!(
84 "{}{path}aeron{path}aeron-all{path}build{path}libs",
85 env!("CARGO_MANIFEST_DIR")
86 ))
87 .exists()
88 {
89 Command::new(&gradle)
90 .current_dir(dir)
91 .args([
92 ":aeron-agent:jar",
93 ":aeron-samples:jar",
94 ":aeron-archive:jar",
95 ":aeron-all:build",
96 ])
97 .stdout(Stdio::inherit())
98 .stderr(Stdio::inherit())
99 .spawn()?
100 .wait()?;
101 }
102
103 return Self::start(
104 &aeron_dir,
105 archive_dir,
106 control_request_channel,
107 control_response_channel,
108 recording_events_channel,
109 );
110 }
111
112 pub fn run_aeron_stats(&self) -> std::io::Result<Child> {
113 let main_dir = env!("CARGO_MANIFEST_DIR");
114 let dir = format!("{}/{}", main_dir, &self.aeron_dir.to_str().unwrap());
115 info!("running 'just aeron-stat {}'", dir);
116 Command::new("just")
117 .args(["aeron-stat", dir.as_str()])
118 .stdout(Stdio::inherit())
119 .stderr(Stdio::inherit())
120 .spawn()
121 }
122
123 pub fn archive_connect(&self) -> Result<(AeronArchive, Aeron), io::Error> {
124 let start = Instant::now();
125 while start.elapsed() < Duration::from_secs(30) {
126 if let Ok(aeron_context) = AeronContext::new() {
127 aeron_context.set_dir(&self.aeron_dir).expect("invalid dir");
128 aeron_context
129 .set_client_name(&CString::new("unit_test_client")?)
130 .expect("invalid client name");
131 if let Ok(aeron) = Aeron::new(&aeron_context) {
132 if aeron.start().is_ok() {
133 if let Ok(archive_context) =
134 AeronArchiveContext::new_with_no_credentials_supplier(
135 &aeron,
136 &self.control_request_channel,
137 &self.control_response_channel,
138 &self.recording_events_channel,
139 )
140 {
141 archive_context
142 .set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))
143 .expect("unable to set idle strategy");
144 if let Ok(connect) =
145 AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)
146 {
147 if let Ok(archive) = connect.poll_blocking(Duration::from_secs(10))
148 {
149 let i = archive.get_archive_id();
150 assert!(i > 0);
151 info!("aeron archive media driver is up [connected with archive id {i}]");
152 sleep(Duration::from_millis(100));
153 return Ok((archive, aeron));
154 };
155 }
156 }
157 error!("aeron error: {}", Aeron::errmsg());
158 }
159 }
160 }
161 info!("waiting for aeron to start up, retrying...");
162 }
163
164 assert!(
165 start.elapsed() < Duration::from_secs(30),
166 "failed to start up aeron media driver"
167 );
168
169 return Err(std::io::Error::other(
170 "unable to start up aeron media driver client",
171 ));
172 }
173
174 /// Starts an embedded Aeron Archive Media Driver process with the specified configurations.
175 ///
176 /// This function cleans and recreates the Aeron and archive directories, configures the JVM to run
177 /// the Aeron Archive Media Driver, and starts the process with the specified control channels.
178 /// It ensures that the environment is correctly prepared for Aeron communication.
179 ///
180 /// # Parameters
181 /// - `aeron_dir`: The directory for the Aeron media driver to use for its IPC mechanisms.
182 /// - `archive_dir`: The directory where the Aeron Archive will store its recordings and metadata.
183 /// - `control_request_channel`: The channel URI used for sending control requests to the Aeron Archive.
184 /// - `control_response_channel`: The channel URI used for receiving control responses from the Aeron Archive.
185 /// - `recording_events_channel`: The channel URI used for receiving recording event notifications from the Aeron Archive.
186 ///
187 /// # Returns
188 /// On success, returns an instance of `EmbeddedArchiveMediaDriverProcess` encapsulating the child process
189 /// and configuration used. Returns an `io::Result` if the process fails to start.
190 ///
191 /// # Errors
192 /// Returns an `io::Result::Err` if:
193 /// - Cleaning or creating the directories fails.
194 /// - The required `.jar` files are missing or not found.
195 /// - The Java process fails to start.
196 ///
197 /// # Example
198 /// ```
199 /// use rusteron_archive::testing::EmbeddedArchiveMediaDriverProcess;
200 /// let driver = EmbeddedArchiveMediaDriverProcess::start(
201 /// "/tmp/aeron-dir",
202 /// "/tmp/archive-dir",
203 /// "aeron:udp?endpoint=localhost:8010",
204 /// "aeron:udp?endpoint=localhost:8011",
205 /// "aeron:udp?endpoint=localhost:8012",
206 /// ).expect("Failed to start Aeron Archive Media Driver");
207 /// ```
208 ///
209 /// # Notes
210 /// - The Aeron `.jar` files must be available under the directory `aeron/aeron-all/build/libs` relative
211 /// to the project's root (`CARGO_MANIFEST_DIR`).
212 /// - The function configures the JVM with properties for Aeron, such as enabling event logging and disabling bounds checks.
213 pub fn start(
214 aeron_dir: &str,
215 archive_dir: &str,
216 control_request_channel: &str,
217 control_response_channel: &str,
218 recording_events_channel: &str,
219 ) -> io::Result<Self> {
220 Self::clean_directory(aeron_dir)?;
221 Self::clean_directory(archive_dir)?;
222
223 // Ensure directories are recreated
224 fs::create_dir_all(aeron_dir)?;
225 fs::create_dir_all(archive_dir)?;
226
227 let binding = fs::read_dir(format!(
228 "{}/aeron/aeron-all/build/libs",
229 env!("CARGO_MANIFEST_DIR")
230 ))?
231 .filter(|f| f.is_ok())
232 .map(|f| f.unwrap())
233 .filter(|f| {
234 f.file_name()
235 .to_string_lossy()
236 .to_string()
237 .ends_with(".jar")
238 })
239 .next()
240 .unwrap()
241 .path();
242 let mut jar_path = binding.to_str().unwrap();
243 let mut agent_jar = jar_path.replace("aeron-all", "aeron-agent");
244
245 assert!(fs::exists(jar_path).unwrap_or_default());
246 if fs::exists(&agent_jar).unwrap_or_default() {
247 agent_jar = format!("-javaagent:{}", agent_jar);
248 } else {
249 agent_jar = " ".to_string();
250 }
251 let separator = if cfg!(target_os = "windows") {
252 ";"
253 } else {
254 ":"
255 };
256
257 let combined_jars = format!(
258 "{}{separator}{}",
259 jar_path,
260 jar_path.replace("aeron-all", "aeron-archive")
261 );
262 jar_path = &combined_jars;
263
264 let args = [
265 agent_jar.as_str(),
266 "--add-opens",
267 "java.base/jdk.internal.misc=ALL-UNNAMED",
268 "-cp",
269 jar_path,
270 &format!("-Daeron.dir={}", aeron_dir),
271 &format!("-Daeron.archive.dir={}", archive_dir),
272 "-Daeron.spies.simulate.connection=true",
273 // "-Daeron.event.log=admin", // this will only work if an agent is built
274 "-Daeron.event.log=all", // this will only work if an agent is built
275 "-Daeron.event.log.disable=FRAME_IN,FRAME_OUT", // this will only work if an agent is built
276 "-Daeron.event.archive.log=all",
277 "-Daeron.event.cluster.log=all",
278 // "-Daeron.term.buffer.sparse.file=false",
279 // "-Daeron.pre.touch.mapped.memory=true",
280 // "-Daeron.threading.mode=DEDICATED",
281 // "-Daeron.sender.idle.strategy=noop",
282 // "-Daeron.receiver.idle.strategy=noop",
283 // "-Daeron.conductor.idle.strategy=spin",
284 "-Dagrona.disable.bounds.checks=true",
285 &format!(
286 "-Daeron.archive.control.channel={}",
287 control_request_channel
288 ),
289 &format!(
290 "-Daeron.archive.control.response.channel={}",
291 control_response_channel
292 ),
293 &format!(
294 "-Daeron.archive.recording.events.channel={}",
295 recording_events_channel
296 ),
297 "-Daeron.archive.replication.channel=aeron:udp?endpoint=localhost:0",
298 "io.aeron.archive.ArchivingMediaDriver",
299 ];
300
301 info!(
302 "starting archive media driver [\n\tjava {}\n]",
303 args.join(" ")
304 );
305
306 let child = Command::new("java")
307 .args(args)
308 .stdout(Stdio::inherit())
309 .stderr(Stdio::inherit())
310 .spawn()?;
311
312 info!(
313 "started archive media driver [{:?}",
314 fs::read_dir(aeron_dir)?.collect::<Vec<_>>()
315 );
316
317 Ok(EmbeddedArchiveMediaDriverProcess {
318 child,
319 aeron_dir: aeron_dir.into_c_string(),
320 archive_dir: archive_dir.into_c_string(),
321 control_request_channel: control_request_channel.to_string(),
322 control_response_channel: control_response_channel.to_string(),
323 recording_events_channel: recording_events_channel.to_string(),
324 })
325 }
326
327 fn clean_directory(dir: &str) -> io::Result<()> {
328 info!("cleaning directory {}", dir);
329 let path = Path::new(dir);
330 if path.exists() {
331 fs::remove_dir_all(path)?;
332 }
333 Ok(())
334 }
335
336 pub fn kill_all_java_processes() -> io::Result<ExitStatus> {
337 if cfg!(not(target_os = "windows")) {
338 return Ok(std::process::Command::new("pkill")
339 .args(["-9", "java"])
340 .stdout(Stdio::inherit())
341 .stderr(Stdio::inherit())
342 .spawn()?
343 .wait()?);
344 }
345 Ok(ExitStatus::default())
346 }
347}
348
349// Use the Drop trait to ensure process cleanup and directory removal after test completion
350impl Drop for EmbeddedArchiveMediaDriverProcess {
351 fn drop(&mut self) {
352 warn!("WARN: stopping aeron archive media driver!!!!");
353 // Attempt to kill the Java process if it’s still running
354 if let Err(e) = self.child.kill() {
355 error!("Failed to kill Java process: {}", e);
356 }
357
358 // Clean up directories after the process has terminated
359 if let Err(e) = Self::clean_directory(&self.aeron_dir.to_str().unwrap()) {
360 error!("Failed to clean up Aeron directory: {}", e);
361 }
362 if let Err(e) = Self::clean_directory(&self.archive_dir.to_str().unwrap()) {
363 error!("Failed to clean up Archive directory: {}", e);
364 }
365 }
366}
367
368pub fn set_panic_hook() {
369 panic::set_hook(Box::new(|info| {
370 // Get the backtrace
371 let backtrace = Backtrace::force_capture();
372 error!("Stack trace: {backtrace:#?}");
373
374 let backtrace = format!("{:?}", backtrace);
375 // Regular expression to match the function, file, and line
376 let re = Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
377
378 // Extract and print in IntelliJ format with function
379 for cap in re.captures_iter(&backtrace) {
380 let function = &cap[1];
381 let file = &cap[2];
382 let line = &cap[3];
383 info!("{file}:{line} in {function}");
384 }
385
386 error!("Panic occurred: {:#?}", info);
387
388 if let Some(payload) = info.payload().downcast_ref::<&str>() {
389 error!("Panic message: {}", payload);
390 } else if let Some(payload) = info.payload().downcast_ref::<String>() {
391 error!("Panic message: {}", payload);
392 } else {
393 // If it's not a &str or String, try to print it as Debug
394 error!(
395 "Panic with non-standard payload: {:?}",
396 info.payload().type_id()
397 );
398 }
399
400 warn!("shutdown");
401
402 process::abort();
403 }))
404}