"""A class for the car acoustic environment."""
# pylint: disable=import-error
# pylint: disable=too-many-instance-attributes
from __future__ import annotations
import logging
import warnings
from pathlib import Path
import numpy as np
import pyloudnorm as pyln
from omegaconf import DictConfig
from scipy.io import wavfile
from scipy.signal import lfilter
from clarity.enhancer.compressor import Compressor
from clarity.enhancer.nalr import NALR
from clarity.utils.audiogram import Audiogram, Listener
from clarity.utils.car_noise_simulator.carnoise_signal_generator import (
CarNoiseSignalGenerator,
)
from recipes.cad1.task2.baseline.audio_manager import AudioManager
logger = logging.getLogger(__name__)
[docs]
class CarSceneAcoustics:
"""
A class for the car acoustic environment.
Constants:
ANECHOIC_HRTF_FOR_NOISE (dict): A dictionary containing the names of the
anechoic BRIRs for the following directions:
0 degrees: front
- 000_left: The left channel of the BRIR for 0 degrees.
- 000_right: The right channel of the BRIR for 0 degrees.
-90 degrees: left
- m90_left: The left channel of the BRIR for -90 degrees.
- m90_right: The right channel of the BRIR for -90 degrees.
90 degrees: right
- p90_left: The left channel of the BRIR for 90 degrees.
- p90_right: The right channel of the BRIR for 90 degrees.
"""
ANECHOIC_HRTF_FOR_NOISE = {
"000_left": "HR36_E02_CH1_Left.wav",
"000_right": "HR36_E02_CH1_Right.wav",
"m90_left": "HR0_E02_CH1_Left.wav",
"m90_right": "HR0_E02_CH1_Right.wav",
"p90_left": "HR72_E02_CH1_Left.wav",
"p90_right": "HR72_E02_CH1_Right.wav",
}
def __init__(
self,
track_duration: int,
sample_rate: int,
hrtf_dir: str,
config_nalr: dict,
config_compressor: dict,
extend_noise: float = 0.2,
):
"""
Initializes the CarSceneAcoustics object.
Args:
track_duration (int): The duration of the audio track in seconds..
sample_rate (int): The sample rate of the audio in Hz.
hrtf_dir (str): The path to the directory containing the BRIR files.
config_nalr (dict): The configuration for the NALR enhancer.
config_compressor (dict): The configuration for the compressor.
extend_noise (float): The factor by which to extend the duration of the car
noise generated by the CarNoiseGenerator. Defaults to 0.2.
This is to prevent the car noise from being shorter than the audio track.
"""
self.track_duration = track_duration * (1 + extend_noise)
self.sample_rate = sample_rate
self.hrtf_dir = hrtf_dir
self.preload_anechoic_hrtf(self.hrtf_dir)
self.enhancer = NALR(**config_nalr)
self.compressor = Compressor(**config_compressor)
self.carnoise = CarNoiseSignalGenerator(
duration_secs=self.track_duration,
sample_rate=self.sample_rate,
)
self.loudness_meter = pyln.Meter(self.sample_rate)
[docs]
def preload_anechoic_hrtf(self, hrtf_dir: str) -> None:
"""
Loads the Anechoic BRIRs from the eBrird database for the given directions.
Using the following directions:
0 degrees: front
-90 degrees: left
90 degrees: right
Args:
brird_dir (str): The path to the directory containing the BRIR files.
"""
self.hrir_for_noise = {}
anechoic_hrtf_dir = Path(hrtf_dir) / "Anechoic" / "audio"
for key, item in self.ANECHOIC_HRTF_FOR_NOISE.items():
self.hrir_for_noise[key] = wavfile.read(anechoic_hrtf_dir / item)[1]
[docs]
def apply_hearing_aid(self, signal: np.ndarray, audiogram: Audiogram) -> np.ndarray:
"""
Applies the hearing aid:
It consists in NALR prescription and Compressor
Args:
signal (np.ndarray): The audio signal to be enhanced.
audiogram (Audiogram): The audiogram of the listener.
Returns:
np.ndarray: The enhanced audio signal.
"""
nalr_fir, _ = self.enhancer.build(audiogram)
signal = self.enhancer.apply(nalr_fir, signal)
signal, _, _ = self.compressor.process(signal)
return signal
[docs]
def add_anechoic_hrtf_to_noise(self, noise_signal: np.ndarray) -> np.ndarray:
"""
Adds the Anechoic HRTF to the noise signal.
Args:
noise_signal: A numpy array representing the different components
of the car noise signal.
Returns:
np.ndarray: The noise signal with the Anechoic HRTF applied.
"""
# Apply Anechoic HRTF to the noise signal
# Engine first
out_left = lfilter(self.hrir_for_noise["000_left"], 1, noise_signal[0, :])
our_right = lfilter(self.hrir_for_noise["000_right"], 1, noise_signal[0, :])
# noise processing hardwired for 2 noises
out_left += lfilter(self.hrir_for_noise["m90_left"], 1, noise_signal[1, :])
our_right += lfilter(self.hrir_for_noise["m90_right"], 1, noise_signal[1, :])
# swap HRIR so this noise is on the other side
out_left += lfilter(self.hrir_for_noise["p90_left"], 1, noise_signal[2, :])
our_right += lfilter(self.hrir_for_noise["p90_right"], 1, noise_signal[2, :])
return np.stack([out_left, our_right], axis=0)
[docs]
def get_car_noise(
self,
car_noise_params: dict,
) -> np.ndarray:
"""
Generates car noise.
Args:
car_noise_params (dict): Car Noise Parameters as generated by
Class CarNoiseParameterGenerator
Returns:
numpy.ndarray: A numpy array representing the different components
of the car noise signal
"""
return self.carnoise.generate_car_noise(
noise_parameters=car_noise_params,
number_noise_sources=2,
commonness_factor=0,
)
[docs]
def add_hrtf_to_stereo_signal(
self, signal: np.ndarray, hrir: dict, hrtf_type: str
) -> np.ndarray:
"""Add a head rotation transfer function using binaural room impulse
response (BRIR) from eBrird.
Args:
signal (np.ndarray): a numpy array of shape (2, n_samples) containing the
stereo audio signal.
hrir: a dictionary containing the HRIR (head-related impulse response)
filenames.
hrtf_type: the type of HRTF to use. Can be either "Anechoic" or "Car".
Returns:
A numpy array of shape (2, n_samples) containing the stereo audio signal
with the BRIR added.
"""
car_hrtf_path = Path(self.hrtf_dir) / hrtf_type / "audio"
# HRTF from left speaker
hr_ls03_ch1_left = wavfile.read(
car_hrtf_path / f"{hrir['left_speaker']['left_side']}.wav"
)[1]
hr_ls03_ch1_right = wavfile.read(
car_hrtf_path / f"{hrir['left_speaker']['right_side']}.wav"
)[1]
# HRTF from right speaker
hr_ls04_ch1_left = wavfile.read(
car_hrtf_path / f"{hrir['right_speaker']['left_side']}.wav"
)[1]
hr_ls04_ch1_right = wavfile.read(
car_hrtf_path / f"{hrir['right_speaker']['right_side']}.wav"
)[1]
# add the BRIRs to the signal
# Left Speaker (LS03)
out_left = lfilter(hr_ls03_ch1_left, 1, signal[0, :])
out_right = lfilter(hr_ls03_ch1_right, 1, signal[0, :])
# Right Speaker (LS04)
out_left += lfilter(hr_ls04_ch1_left, 1, signal[1, :])
out_right += lfilter(hr_ls04_ch1_right, 1, signal[1, :])
return np.stack([out_left, out_right], axis=0)
[docs]
def scale_signal_to_snr(
self,
signal: np.ndarray,
reference_signal: np.ndarray,
snr: float = 0.0,
) -> np.ndarray:
"""
Scales the target signal to the desired SNR.
We transpose channel because pylodnorm operates
on arrays with shape [n_samples, n_channels].
Args:
target_signal (np.ndarray): The target signal to scale.
reference_signal (np.ndarray): The reference signal.
snr (float): The desired SNR gain in dB.
Returns:
np.ndarray: The scaled target signal.
"""
# Ensure channels are in the correct dimension
if reference_signal.shape[0] < reference_signal.shape[1]:
reference_signal = reference_signal.T
if signal.shape[0] < signal.shape[1]:
signal = signal.T
ref_signal_lufs = (
0.0
if reference_signal is None
else self.loudness_meter.integrated_loudness(reference_signal)
)
signal_lufs = self.loudness_meter.integrated_loudness(signal)
target_lufs = ref_signal_lufs - snr
with warnings.catch_warnings(record=True):
normalised_signal = pyln.normalize.loudness(
signal, signal_lufs, target_lufs
)
# return to original shape
return normalised_signal.T
[docs]
def equalise_level(
self, signal: np.ndarray, reference_signal: np.ndarray, max_level: float = 20
) -> np.ndarray:
"""
Equalises the level of the target signal to the reference signal.
Args:
signal (np.ndarray): The target signal to equalise.
reference_signal (np.ndarray): The reference signal.
max_level (float): The maximum level of the target signal.
This to prevent clipping.
Returns:
np.ndarray: The equalised target signal.
"""
signal_lufs = self.loudness_meter.integrated_loudness(signal.T)
target_lufs = self.loudness_meter.integrated_loudness(reference_signal.T)
with warnings.catch_warnings(record=True):
return pyln.normalize.loudness(
signal, signal_lufs, min(target_lufs, max_level)
)
[docs]
@staticmethod
def add_two_signals(signal1: np.ndarray, signal2: np.ndarray) -> np.ndarray:
"""
Adds two signals together.
Args:
signal1 (np.ndarray): The first signal.
signal2 (np.ndarray): The second signal.
Returns:
np.ndarray: The sum of the two signals.
"""
min_length = min(signal1.shape[1], signal2.shape[1])
return signal1[:, :min_length] + signal2[:, :min_length]
[docs]
def apply_car_acoustics_to_signal(
self,
enh_signal: np.ndarray,
scene: dict,
listener: Listener,
hrtf: dict,
audio_manager: AudioManager,
config: DictConfig,
) -> np.ndarray:
"""
Applies the car acoustics to the enhanced signal.
Args:
enh_signal (np.ndarray): The enhanced signal to apply the car acoustics to.
scene (dict): The scene dictionary with the acoustics parameters.
listener (Listener): The listener characteristics.
hrtf (dict): A dictionary containing the head-related transfer functions
(HRTFs) for the listener being evaluated. This includes the left and
right HRTFs for the car and the anechoic room.
audio_manager (AudioManager): The audio manager object.
config (DictConfig): The config object.
Returns:
np.ndarray: The enhanced signal with the car acoustics applied.
np.ndarray: The reference signal normalised to enhanced level.
"""
# 1. Generates car noise and adds anechoic HRTFs to the car noise
# car_noise_anechoic = car_noise + anechoic HRTF
car_noise = self.get_car_noise(scene["car_noise_parameters"])
car_noise_anechoic = self.add_anechoic_hrtf_to_noise(car_noise)
if config.evaluate.save_intermediate_wavs:
audio_manager.add_audios_to_save("car_noise_anechoic", car_noise_anechoic)
# 2. Add HRTFs to enhanced signal
# processed_signal = enh_signal + car HRTF
processed_signal = self.add_hrtf_to_stereo_signal(
enh_signal, hrtf["car"], "Car"
)
if config.evaluate.save_intermediate_wavs:
audio_manager.add_audios_to_save("enh_signal_hrtf", processed_signal)
# 3. Scale noise to target SNR
# car_noise_anechoic = car_noise_anechoic * scale_factor
car_noise_anechoic = self.scale_signal_to_snr(
signal=car_noise_anechoic,
reference_signal=processed_signal,
snr=float(scene["snr"]),
)
if config.evaluate.save_intermediate_wavs:
audio_manager.add_audios_to_save(
"car_noise_anechoic_scaled", car_noise_anechoic
)
# 4. Add the scaled anechoic car noise to the enhanced signal
# processed_signal = (enh_signal * car HRTF)
# + (car_noise * Anechoic HRTF) * scale_factor
processed_signal = self.add_two_signals(processed_signal, car_noise_anechoic)
if config.evaluate.save_intermediate_wavs:
audio_manager.add_audios_to_save(
"enh_signal_hrtf_plus_car_noise_anechoic", processed_signal
)
# 5. Apply Hearing Aid to Left and Right channels and join them
processed_signal_left = self.apply_hearing_aid(
processed_signal[0, :], listener.audiogram_left
)
processed_signal_right = self.apply_hearing_aid(
processed_signal[1, :], listener.audiogram_right
)
processed_signal = np.stack(
[processed_signal_left, processed_signal_right], axis=0
)
# processed_signal = np.clip(processed_signal, -1.0, 1.0)
n_clipped, processed_signal = audio_manager.clip_audio(processed_signal)
if n_clipped > 0:
logger.warning(
f"Scene {scene['scene']}: {n_clipped}"
" samples clipped in evaluation signal."
)
audio_manager.add_audios_to_save("ha_processed_signal", processed_signal)
return processed_signal