1use crate::IntoCString;
2use crate::{Aeron, AeronArchive, AeronArchiveAsyncConnect, AeronArchiveContext, AeronContext};
3use log::info;
4use log::{error, warn};
5use regex::Regex;
6use std::backtrace::Backtrace;
7use std::ffi::CString;
8use std::path::Path;
9use std::process::{Child, Command, ExitStatus, Stdio};
10use std::thread::sleep;
11use std::time::{Duration, Instant};
12use std::{fs, io, panic, process};
13
14pub struct EmbeddedArchiveMediaDriverProcess {
15 child: Child,
16 pub aeron_dir: CString,
17 pub archive_dir: CString,
18 pub control_request_channel: String,
19 pub control_response_channel: String,
20 pub recording_events_channel: String,
21}
22
23impl EmbeddedArchiveMediaDriverProcess {
24 pub fn build_and_start(
65 aeron_dir: &str,
66 archive_dir: &str,
67 control_request_channel: &str,
68 control_response_channel: &str,
69 recording_events_channel: &str,
70 ) -> io::Result<Self> {
71 let path = std::path::MAIN_SEPARATOR;
72 let gradle = if cfg!(target_os = "windows") {
73 &format!("{}{path}aeron{path}gradlew.bat", env!("CARGO_MANIFEST_DIR"),)
74 } else {
75 "./gradlew"
76 };
77 let dir = format!("{}{path}aeron", env!("CARGO_MANIFEST_DIR"),);
78 info!("running {} in {}", gradle, dir);
79
80 if !Path::new(&format!(
81 "{}{path}aeron{path}aeron-all{path}build{path}libs",
82 env!("CARGO_MANIFEST_DIR")
83 ))
84 .exists()
85 {
86 Command::new(&gradle)
87 .current_dir(dir)
88 .args([
89 ":aeron-agent:jar",
90 ":aeron-samples:jar",
91 ":aeron-archive:jar",
92 ":aeron-all:build",
93 ])
94 .stdout(Stdio::inherit())
95 .stderr(Stdio::inherit())
96 .spawn()?
97 .wait()?;
98 }
99
100 return Self::start(
101 &aeron_dir,
102 archive_dir,
103 control_request_channel,
104 control_response_channel,
105 recording_events_channel,
106 );
107 }
108
109 pub fn run_aeron_stats(&self) -> std::io::Result<Child> {
110 let main_dir = env!("CARGO_MANIFEST_DIR");
111 let dir = format!("{}/{}", main_dir, &self.aeron_dir.to_str().unwrap());
112 info!("running 'just aeron-stat {}'", dir);
113 Command::new("just")
114 .args(["aeron-stat", dir.as_str()])
115 .stdout(Stdio::inherit())
116 .stderr(Stdio::inherit())
117 .spawn()
118 }
119
120 pub fn archive_connect(&self) -> Result<(AeronArchive, Aeron), io::Error> {
121 let start = Instant::now();
122 while start.elapsed() < Duration::from_secs(30) {
123 if let Ok(aeron_context) = AeronContext::new() {
124 aeron_context.set_dir(&self.aeron_dir).expect("invalid dir");
125 aeron_context
126 .set_client_name(&CString::new("unit_test_client")?)
127 .expect("invalid client name");
128 if let Ok(aeron) = Aeron::new(&aeron_context) {
129 if aeron.start().is_ok() {
130 if let Ok(archive_context) =
131 AeronArchiveContext::new_with_no_credentials_supplier(
132 &aeron,
133 &self.control_request_channel,
134 &self.control_response_channel,
135 &self.recording_events_channel,
136 )
137 {
138 if let Ok(connect) =
139 AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)
140 {
141 if let Ok(archive) = connect.poll_blocking(Duration::from_secs(10))
142 {
143 let i = archive.get_archive_id();
144 assert!(i > 0);
145 info!("aeron archive media driver is up [connected with archive id {i}]");
146 sleep(Duration::from_millis(100));
147 return Ok((archive, aeron));
148 };
149 }
150 }
151 error!("aeron error: {}", Aeron::errmsg());
152 }
153 }
154 }
155 info!("waiting for aeron to start up, retrying...");
156 }
157
158 assert!(
159 start.elapsed() < Duration::from_secs(30),
160 "failed to start up aeron media driver"
161 );
162
163 return Err(std::io::Error::other(
164 "unable to start up aeron media driver client",
165 ));
166 }
167
168 pub fn start(
208 aeron_dir: &str,
209 archive_dir: &str,
210 control_request_channel: &str,
211 control_response_channel: &str,
212 recording_events_channel: &str,
213 ) -> io::Result<Self> {
214 Self::clean_directory(aeron_dir)?;
215 Self::clean_directory(archive_dir)?;
216
217 fs::create_dir_all(aeron_dir)?;
219 fs::create_dir_all(archive_dir)?;
220
221 let binding = fs::read_dir(format!(
222 "{}/aeron/aeron-all/build/libs",
223 env!("CARGO_MANIFEST_DIR")
224 ))?
225 .filter(|f| f.is_ok())
226 .map(|f| f.unwrap())
227 .filter(|f| {
228 f.file_name()
229 .to_string_lossy()
230 .to_string()
231 .ends_with(".jar")
232 })
233 .next()
234 .unwrap()
235 .path();
236 let mut jar_path = binding.to_str().unwrap();
237 let agent_jar = jar_path.replace("aeron-all", "aeron-agent");
238
239 assert!(fs::exists(jar_path).unwrap_or_default());
240 let mut args = vec![];
241
242 if fs::exists(&agent_jar).unwrap_or_default() {
243 args.push(format!("-javaagent:{}", agent_jar));
244 }
245 let separator = if cfg!(target_os = "windows") {
246 ";"
247 } else {
248 ":"
249 };
250
251 let combined_jars = format!(
252 "{}{separator}{}",
253 jar_path,
254 jar_path.replace("aeron-all", "aeron-archive")
255 );
256 jar_path = &combined_jars;
257
258 args.push("--add-opens".to_string());
259 args.push("java.base/jdk.internal.misc=ALL-UNNAMED".to_string());
260 args.push("-cp".to_string());
261 args.push(jar_path.to_string());
262 args.push(format!("-Daeron.dir={}", aeron_dir));
263 args.push(format!("-Daeron.archive.dir={}", archive_dir));
264 args.push("-Daeron.spies.simulate.connection=true".to_string());
265 args.push("-Daeron.event.log=all".to_string());
266 args.push("-Daeron.event.log.disable=FRAME_IN,FRAME_OUT".to_string());
267 args.push("-Daeron.event.archive.log=all".to_string());
268 args.push("-Daeron.event.cluster.log=all".to_string());
269 args.push("-Dagrona.disable.bounds.checks=true".to_string());
270 args.push(format!(
271 "-Daeron.archive.control.channel={}",
272 control_request_channel
273 ));
274 args.push(format!(
275 "-Daeron.archive.control.response.channel={}",
276 control_response_channel
277 ));
278 args.push(format!(
279 "-Daeron.archive.recording.events.channel={}",
280 recording_events_channel
281 ));
282 args.push("-Daeron.archive.replication.channel=aeron:udp?endpoint=localhost:0".to_string());
283 args.push("-Daeron.client.liveness.timeout=60000000000".to_string());
284 args.push("-Daeron.image.liveness.timeout=60000000000".to_string());
285 args.push("-Daeron.publication.unblock.timeout=65000000000".to_string());
286 args.push("io.aeron.archive.ArchivingMediaDriver".to_string());
287
288 info!(
289 "starting archive media driver [\n\tjava {}\n]",
290 args.join(" ")
291 );
292
293 let child = Command::new("java")
294 .args(args)
295 .stdout(Stdio::inherit())
296 .stderr(Stdio::inherit())
297 .spawn()?;
298
299 info!(
300 "started archive media driver [{:?}",
301 fs::read_dir(aeron_dir)?.collect::<Vec<_>>()
302 );
303
304 Ok(EmbeddedArchiveMediaDriverProcess {
305 child,
306 aeron_dir: aeron_dir.into_c_string(),
307 archive_dir: archive_dir.into_c_string(),
308 control_request_channel: control_request_channel.to_string(),
309 control_response_channel: control_response_channel.to_string(),
310 recording_events_channel: recording_events_channel.to_string(),
311 })
312 }
313
314 fn clean_directory(dir: &str) -> io::Result<()> {
315 info!("cleaning directory {}", dir);
316 let path = Path::new(dir);
317 if path.exists() {
318 fs::remove_dir_all(path)?;
319 }
320 Ok(())
321 }
322
323 pub fn kill_all_java_processes() -> io::Result<ExitStatus> {
324 if cfg!(not(target_os = "windows")) {
325 return Ok(std::process::Command::new("pkill")
326 .args(["-9", "java"])
327 .stdout(Stdio::inherit())
328 .stderr(Stdio::inherit())
329 .spawn()?
330 .wait()?);
331 }
332 Ok(ExitStatus::default())
333 }
334}
335
336impl Drop for EmbeddedArchiveMediaDriverProcess {
338 fn drop(&mut self) {
339 warn!("WARN: stopping aeron archive media driver!!!!");
340 if let Err(e) = self.child.kill() {
342 error!("Failed to kill Java process: {}", e);
343 }
344
345 if let Err(e) = Self::clean_directory(&self.aeron_dir.to_str().unwrap()) {
347 error!("Failed to clean up Aeron directory: {}", e);
348 }
349 if let Err(e) = Self::clean_directory(&self.archive_dir.to_str().unwrap()) {
350 error!("Failed to clean up Archive directory: {}", e);
351 }
352 }
353}
354
355pub fn set_panic_hook() {
356 panic::set_hook(Box::new(|info| {
357 let backtrace = Backtrace::force_capture();
359 error!("Stack trace: {backtrace:#?}");
360
361 let backtrace = format!("{:?}", backtrace);
362 let re = Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
364
365 for cap in re.captures_iter(&backtrace) {
367 let function = &cap[1];
368 let file = &cap[2];
369 let line = &cap[3];
370 info!("{file}:{line} in {function}");
371 }
372
373 error!("Panic occurred: {:#?}", info);
374
375 if let Some(payload) = info.payload().downcast_ref::<&str>() {
376 error!("Panic message: {}", payload);
377 } else if let Some(payload) = info.payload().downcast_ref::<String>() {
378 error!("Panic message: {}", payload);
379 } else {
380 error!(
382 "Panic with non-standard payload: {:?}",
383 info.payload().type_id()
384 );
385 }
386
387 warn!("shutdown");
388
389 process::abort();
390 }))
391}