Source code for recipes.cad1.task2.baseline.car_scene_acoustics

"""A class for the car acoustic environment."""

# pylint: disable=import-error
# pylint: disable=too-many-instance-attributes
from __future__ import annotations

import logging
import warnings
from pathlib import Path

import numpy as np
import pyloudnorm as pyln
from omegaconf import DictConfig
from scipy.io import wavfile
from scipy.signal import lfilter

from clarity.enhancer.compressor import Compressor
from clarity.enhancer.nalr import NALR
from clarity.utils.audiogram import Audiogram, Listener
from clarity.utils.car_noise_simulator.carnoise_signal_generator import (
    CarNoiseSignalGenerator,
)
from recipes.cad1.task2.baseline.audio_manager import AudioManager

logger = logging.getLogger(__name__)


[docs] class CarSceneAcoustics: """ A class for the car acoustic environment. Constants: ANECHOIC_HRTF_FOR_NOISE (dict): A dictionary containing the names of the anechoic BRIRs for the following directions: 0 degrees: front - 000_left: The left channel of the BRIR for 0 degrees. - 000_right: The right channel of the BRIR for 0 degrees. -90 degrees: left - m90_left: The left channel of the BRIR for -90 degrees. - m90_right: The right channel of the BRIR for -90 degrees. 90 degrees: right - p90_left: The left channel of the BRIR for 90 degrees. - p90_right: The right channel of the BRIR for 90 degrees. """ ANECHOIC_HRTF_FOR_NOISE = { "000_left": "HR36_E02_CH1_Left.wav", "000_right": "HR36_E02_CH1_Right.wav", "m90_left": "HR0_E02_CH1_Left.wav", "m90_right": "HR0_E02_CH1_Right.wav", "p90_left": "HR72_E02_CH1_Left.wav", "p90_right": "HR72_E02_CH1_Right.wav", } def __init__( self, track_duration: int, sample_rate: int, hrtf_dir: str, config_nalr: dict, config_compressor: dict, extend_noise: float = 0.2, ): """ Initializes the CarSceneAcoustics object. Args: track_duration (int): The duration of the audio track in seconds.. sample_rate (int): The sample rate of the audio in Hz. hrtf_dir (str): The path to the directory containing the BRIR files. config_nalr (dict): The configuration for the NALR enhancer. config_compressor (dict): The configuration for the compressor. extend_noise (float): The factor by which to extend the duration of the car noise generated by the CarNoiseGenerator. Defaults to 0.2. This is to prevent the car noise from being shorter than the audio track. """ self.track_duration = track_duration * (1 + extend_noise) self.sample_rate = sample_rate self.hrtf_dir = hrtf_dir self.preload_anechoic_hrtf(self.hrtf_dir) self.enhancer = NALR(**config_nalr) self.compressor = Compressor(**config_compressor) self.carnoise = CarNoiseSignalGenerator( duration_secs=self.track_duration, sample_rate=self.sample_rate, ) self.loudness_meter = pyln.Meter(self.sample_rate)
[docs] def preload_anechoic_hrtf(self, hrtf_dir: str) -> None: """ Loads the Anechoic BRIRs from the eBrird database for the given directions. Using the following directions: 0 degrees: front -90 degrees: left 90 degrees: right Args: brird_dir (str): The path to the directory containing the BRIR files. """ self.hrir_for_noise = {} anechoic_hrtf_dir = Path(hrtf_dir) / "Anechoic" / "audio" for key, item in self.ANECHOIC_HRTF_FOR_NOISE.items(): self.hrir_for_noise[key] = wavfile.read(anechoic_hrtf_dir / item)[1]
[docs] def apply_hearing_aid(self, signal: np.ndarray, audiogram: Audiogram) -> np.ndarray: """ Applies the hearing aid: It consists in NALR prescription and Compressor Args: signal (np.ndarray): The audio signal to be enhanced. audiogram (Audiogram): The audiogram of the listener. Returns: np.ndarray: The enhanced audio signal. """ nalr_fir, _ = self.enhancer.build(audiogram) signal = self.enhancer.apply(nalr_fir, signal) signal, _, _ = self.compressor.process(signal) return signal
[docs] def add_anechoic_hrtf_to_noise(self, noise_signal: np.ndarray) -> np.ndarray: """ Adds the Anechoic HRTF to the noise signal. Args: noise_signal: A numpy array representing the different components of the car noise signal. Returns: np.ndarray: The noise signal with the Anechoic HRTF applied. """ # Apply Anechoic HRTF to the noise signal # Engine first out_left = lfilter(self.hrir_for_noise["000_left"], 1, noise_signal[0, :]) our_right = lfilter(self.hrir_for_noise["000_right"], 1, noise_signal[0, :]) # noise processing hardwired for 2 noises out_left += lfilter(self.hrir_for_noise["m90_left"], 1, noise_signal[1, :]) our_right += lfilter(self.hrir_for_noise["m90_right"], 1, noise_signal[1, :]) # swap HRIR so this noise is on the other side out_left += lfilter(self.hrir_for_noise["p90_left"], 1, noise_signal[2, :]) our_right += lfilter(self.hrir_for_noise["p90_right"], 1, noise_signal[2, :]) return np.stack([out_left, our_right], axis=0)
[docs] def get_car_noise( self, car_noise_params: dict, ) -> np.ndarray: """ Generates car noise. Args: car_noise_params (dict): Car Noise Parameters as generated by Class CarNoiseParameterGenerator Returns: numpy.ndarray: A numpy array representing the different components of the car noise signal """ return self.carnoise.generate_car_noise( noise_parameters=car_noise_params, number_noise_sources=2, commonness_factor=0, )
[docs] def add_hrtf_to_stereo_signal( self, signal: np.ndarray, hrir: dict, hrtf_type: str ) -> np.ndarray: """Add a head rotation transfer function using binaural room impulse response (BRIR) from eBrird. Args: signal (np.ndarray): a numpy array of shape (2, n_samples) containing the stereo audio signal. hrir: a dictionary containing the HRIR (head-related impulse response) filenames. hrtf_type: the type of HRTF to use. Can be either "Anechoic" or "Car". Returns: A numpy array of shape (2, n_samples) containing the stereo audio signal with the BRIR added. """ car_hrtf_path = Path(self.hrtf_dir) / hrtf_type / "audio" # HRTF from left speaker hr_ls03_ch1_left = wavfile.read( car_hrtf_path / f"{hrir['left_speaker']['left_side']}.wav" )[1] hr_ls03_ch1_right = wavfile.read( car_hrtf_path / f"{hrir['left_speaker']['right_side']}.wav" )[1] # HRTF from right speaker hr_ls04_ch1_left = wavfile.read( car_hrtf_path / f"{hrir['right_speaker']['left_side']}.wav" )[1] hr_ls04_ch1_right = wavfile.read( car_hrtf_path / f"{hrir['right_speaker']['right_side']}.wav" )[1] # add the BRIRs to the signal # Left Speaker (LS03) out_left = lfilter(hr_ls03_ch1_left, 1, signal[0, :]) out_right = lfilter(hr_ls03_ch1_right, 1, signal[0, :]) # Right Speaker (LS04) out_left += lfilter(hr_ls04_ch1_left, 1, signal[1, :]) out_right += lfilter(hr_ls04_ch1_right, 1, signal[1, :]) return np.stack([out_left, out_right], axis=0)
[docs] def scale_signal_to_snr( self, signal: np.ndarray, reference_signal: np.ndarray, snr: float | None = 0, ) -> np.ndarray: """ Scales the target signal to the desired SNR. We transpose channel because pylodnorm operates on arrays with shape [n_samples, n_channels]. Args: target_signal (np.ndarray): The target signal to scale. reference_signal (np.ndarray): The reference signal. snr (float): The desired SNR gain in dB. If None, the target signal is scaled to the reference signal. Returns: np.ndarray: The scaled target signal. """ # Ensure channels are in the correct dimension if reference_signal.shape[0] < reference_signal.shape[1]: reference_signal = reference_signal.T if signal.shape[0] < signal.shape[1]: signal = signal.T ref_signal_lufs = ( 0.0 if reference_signal is None else self.loudness_meter.integrated_loudness(reference_signal) ) signal_lufs = self.loudness_meter.integrated_loudness(signal) target_lufs = ref_signal_lufs - snr with warnings.catch_warnings(record=True): normalised_signal = pyln.normalize.loudness( signal, signal_lufs, target_lufs ) # return to original shape return normalised_signal.T
[docs] def equalise_level( self, signal: np.ndarray, reference_signal: np.ndarray, max_level: float = 20 ) -> np.ndarray: """ Equalises the level of the target signal to the reference signal. Args: signal (np.ndarray): The target signal to equalise. reference_signal (np.ndarray): The reference signal. max_level (float): The maximum level of the target signal. This to prevent clipping. Returns: np.ndarray: The equalised target signal. """ signal_lufs = self.loudness_meter.integrated_loudness(signal.T) target_lufs = self.loudness_meter.integrated_loudness(reference_signal.T) with warnings.catch_warnings(record=True): return pyln.normalize.loudness( signal, signal_lufs, min(target_lufs, max_level) )
[docs] @staticmethod def add_two_signals(signal1: np.ndarray, signal2: np.ndarray) -> np.ndarray: """ Adds two signals together. Args: signal1 (np.ndarray): The first signal. signal2 (np.ndarray): The second signal. Returns: np.ndarray: The sum of the two signals. """ min_length = min(signal1.shape[1], signal2.shape[1]) return signal1[:, :min_length] + signal2[:, :min_length]
[docs] def apply_car_acoustics_to_signal( self, enh_signal: np.ndarray, scene: dict, listener: Listener, hrtf: dict, audio_manager: AudioManager, config: DictConfig, ) -> np.ndarray: """ Applies the car acoustics to the enhanced signal. Args: enh_signal (np.ndarray): The enhanced signal to apply the car acoustics to. scene (dict): The scene dictionary with the acoustics parameters. listener (Listener): The listener characteristics. hrtf (dict): A dictionary containing the head-related transfer functions (HRTFs) for the listener being evaluated. This includes the left and right HRTFs for the car and the anechoic room. audio_manager (AudioManager): The audio manager object. config (DictConfig): The config object. Returns: np.ndarray: The enhanced signal with the car acoustics applied. np.ndarray: The reference signal normalised to enhanced level. """ # 1. Generates car noise and adds anechoic HRTFs to the car noise # car_noise_anechoic = car_noise + anechoic HRTF car_noise = self.get_car_noise(scene["car_noise_parameters"]) car_noise_anechoic = self.add_anechoic_hrtf_to_noise(car_noise) if config.evaluate.save_intermediate_wavs: audio_manager.add_audios_to_save("car_noise_anechoic", car_noise_anechoic) # 2. Add HRTFs to enhanced signal # processed_signal = enh_signal + car HRTF processed_signal = self.add_hrtf_to_stereo_signal( enh_signal, hrtf["car"], "Car" ) if config.evaluate.save_intermediate_wavs: audio_manager.add_audios_to_save("enh_signal_hrtf", processed_signal) # 3. Scale noise to target SNR # car_noise_anechoic = car_noise_anechoic * scale_factor car_noise_anechoic = self.scale_signal_to_snr( signal=car_noise_anechoic, reference_signal=processed_signal, snr=float(scene["snr"]), ) if config.evaluate.save_intermediate_wavs: audio_manager.add_audios_to_save( "car_noise_anechoic_scaled", car_noise_anechoic ) # 4. Add the scaled anechoic car noise to the enhanced signal # processed_signal = (enh_signal * car HRTF) # + (car_noise * Anechoic HRTF) * scale_factor processed_signal = self.add_two_signals(processed_signal, car_noise_anechoic) if config.evaluate.save_intermediate_wavs: audio_manager.add_audios_to_save( "enh_signal_hrtf_plus_car_noise_anechoic", processed_signal ) # 5. Apply Hearing Aid to Left and Right channels and join them processed_signal_left = self.apply_hearing_aid( processed_signal[0, :], listener.audiogram_left ) processed_signal_right = self.apply_hearing_aid( processed_signal[1, :], listener.audiogram_right ) processed_signal = np.stack( [processed_signal_left, processed_signal_right], axis=0 ) # processed_signal = np.clip(processed_signal, -1.0, 1.0) n_clipped, processed_signal = audio_manager.clip_audio(processed_signal) if n_clipped > 0: logger.warning( f"Scene {scene['scene']}: {n_clipped}" " samples clipped in evaluation signal." ) audio_manager.add_audios_to_save("ha_processed_signal", processed_signal) return processed_signal