Source code for recipes.cpc1.e032_sheffield.prepare_data

"""Data preparation for the E032 Sheffield CPC1 recipe."""

import csv
import json
import logging
import random
from pathlib import Path

import hydra
import numpy as np
import soundfile as sf
from librosa import resample  # pylint: disable=no-name-in-module
from omegaconf import DictConfig
from tqdm import tqdm

from clarity.evaluator.msbg.msbg import Ear
from clarity.evaluator.msbg.msbg_utils import MSBG_FS, pad
from clarity.utils.audiogram import Listener
from clarity.utils.file_io import read_signal, write_signal

logger = logging.getLogger(__name__)

target_sample_rate = 16000


[docs] def run_data_split(cfg, track): data_split_dir = Path(cfg.path.exp_folder) / "data_split" data_split_dir.mkdir(parents=True, exist_ok=True) scene_train_json = data_split_dir / f"scene_train_list{track}.json" scene_dev_json = data_split_dir / f"scene_dev_list{track}.json" if scene_train_json.exists() and scene_dev_json.exists(): logger.info("Train set and dev set lists exist...") return file_path = ( Path(cfg.path.root) / f"clarity_CPC1_data_train/metadata/CPC1.train{track}.json" ) with file_path.open("r", encoding="utf-8") as fp: scenes_dict = json.load(fp) scene_list = [] for item in scenes_dict: scene_list.append(item["scene"]) scene_list = list(set(scene_list)) scene_dev_list = random.sample( scene_list, int(np.floor(len(scene_list) * cfg.dev_percent)) ) scene_train_list = list(set(scene_list) - set(scene_dev_list)) with scene_train_json.open("w", encoding="utf-8") as fp: json.dump(scene_train_list, fp) with scene_dev_json.open("w", encoding="utf-8") as fp: json.dump(scene_dev_list, fp)
[docs] def listen(ear, signal, listener: Listener): """ Generate MSBG processed signal :param ear: MSBG ear :param wav: binaural signal :param listener: listener object :return: binaural signal """ ear.set_audiogram(listener.audiogram_left) out_l = ear.process(signal[:, 0]) ear.set_audiogram(listener.audiogram_right) out_r = ear.process(signal[:, 1]) if len(out_l[0]) != len(out_r[0]): diff = len(out_l[0]) - len(out_r[0]) if diff > 0: out_r[0] = np.flipud(pad(np.flipud(out_r[0]), len(out_l[0]))) else: out_l[0] = np.flipud(pad(np.flipud(out_l[0]), len(out_r[0]))) return np.concatenate([out_l, out_r]).T
[docs] def run_msbg_simulation(cfg, track): for split in ["train", "test"]: dataset = split + track dataset_folder = Path(cfg.path.root) / f"clarity_CPC1_data_{split}" output_path = dataset_folder / "clarity_data/HA_outputs" / dataset file_path = dataset_folder / f"metadata/CPC1.{dataset}.json" with file_path.open("r", encoding="utf-8") as fp: scenes = json.load(fp) if split == "train": file_path = dataset_folder / "metadata/listeners.CPC1_train.json" else: file_path = Path(dataset_folder) / "metadata/listeners.CPC1_all.json" listener_dict = Listener.load_listener_dict(file_path) # initialize ear ear = Ear(**cfg["MSBGEar"]) for scene_dict in tqdm(scenes): scene = scene_dict["scene"] listener_id = scene_dict["listener"] system = scene_dict["system"] signal_file = output_path / f"{scene}_{listener_id}_{system}.wav" # signals to write outfile_stem = output_path / f"{scene}_{listener_id}_{system}" signal_files_to_write = [ Path(f"{outfile_stem}_HL-output.wav"), ] # if all signals to write exist, pass if all(f.exists() for f in signal_files_to_write): continue signal = read_signal(signal_file) # retrieve audiograms listener = listener_dict[listener_id] signals_to_write = [listen(ear, signal, listener)] for signal, signal_file in zip(signals_to_write, signal_files_to_write): write_signal( signal_file, signal, MSBG_FS, floating_point=True, )
[docs] def generate_data_split( orig_data_json: Path, orig_signal_folder: Path, target_data_folder: Path, data_split, split_data_list, if_msbg=False, if_ref=False, ): with orig_data_json.open("r", encoding="utf-8") as fp: all_data_list = json.load(fp) ext = "" if if_msbg: ext = "_msbg" elif if_ref: ext = "_ref" left_tgt_signal_folder = target_data_folder / ( str(orig_signal_folder).split("/")[-2] + "_left" + ext ) right_tgt_signal_folder = target_data_folder / ( str(orig_signal_folder).split("/")[-2] + "_right" + ext ) left_tgt_signal_folder.mkdir(parents=True, exist_ok=True) right_tgt_signal_folder.mkdir(parents=True, exist_ok=True) csv_lines_left = [["ID", "duration", "wav", "spk_id", "wrd"]] csv_lines_right = [["ID", "duration", "wav", "spk_id", "wrd"]] csv_lines_binaural = [["ID", "duration", "wav", "spk_id", "wrd"]] left_csvfile = target_data_folder / f"left_{data_split}{ext}.csv" right_csvfile = target_data_folder / f"right_{data_split}{ext}.csv" binaural_csvfile = target_data_folder / f"binaural_{data_split}{ext}.csv" for item in tqdm(all_data_list): snt_id = item["signal"] if snt_id.split("_")[0] in split_data_list or data_split == "test": if if_msbg: wav_file = orig_signal_folder / f"{snt_id}_HL-output.wav" elif if_ref: wav_file = ( orig_signal_folder / f"../../scenes/{snt_id.split('_')[0]}_target_anechoic.wav" ) else: wav_file = orig_signal_folder / f"{snt_id}.wav" wav_file_left = left_tgt_signal_folder / f"{snt_id}.wav" wav_file_right = right_tgt_signal_folder / f"{snt_id}.wav" if wav_file_left.exists() and wav_file_right.exists(): continue spk_id = item["listener"] # should be listener_id wrds = item["prompt"].upper() utt, orig_fs = sf.read(wav_file) utt_16k = resample( np.array(utt).transpose(), orig_sr=orig_fs, target_sr=target_sample_rate ).transpose() duration = ( len(utt_16k[2 * target_sample_rate :, 0]) / target_sample_rate ) # Get rid of the first two seconds, as there is no speech sf.write( wav_file_left, utt_16k[2 * target_sample_rate :, 0], target_sample_rate ) sf.write( wav_file_right, utt_16k[2 * target_sample_rate :, 1], target_sample_rate ) csv_lines_left.append( [snt_id, str(duration), str(wav_file_left), spk_id, wrds] ) csv_lines_right.append( [snt_id, str(duration), str(wav_file_right), spk_id, wrds] ) csv_lines_binaural.append( ["left_" + snt_id, str(duration), str(wav_file_left), spk_id, wrds] ) csv_lines_binaural.append( ["right_" + snt_id, str(duration), str(wav_file_right), spk_id, wrds] ) with left_csvfile.open(mode="w", encoding="utf-8") as csv_f: csv_writer = csv.writer( csv_f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL ) for line in csv_lines_left: csv_writer.writerow(line) with right_csvfile.open(mode="w", encoding="utf-8") as csv_f: csv_writer = csv.writer( csv_f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL ) for line in csv_lines_right: csv_writer.writerow(line) with binaural_csvfile.open(mode="w", encoding="utf-8") as csv_f: csv_writer = csv.writer( csv_f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL ) for line in csv_lines_binaural: csv_writer.writerow(line)
[docs] def run_signal_generation_train(cfg, track): train_json_path = ( Path(cfg.path.cpc1_train_data) / f"metadata/CPC1.train{track}.json" ) train_signal_folder = ( Path(cfg.path.cpc1_train_data) / f"clarity_data/HA_outputs/train{track}/" ) target_folder = Path(cfg.path.exp_folder) / f"cpc1_asr_data{track}" lists_to_generate = [ Path(cfg.path.exp_folder) / f"data_split/scene_train_list{track}.json", Path(cfg.path.exp_folder) / f"data_split/scene_dev_list{track}.json", ] datasets_to_generate = [] for list_to_generate in lists_to_generate: with list_to_generate.open("r", encoding="utf-8") as fp: datasets_to_generate.append(json.load(fp)) for list_to_generate, dataset in zip(lists_to_generate, datasets_to_generate): # generate_data_split( # train_json_path, # train_signal_folder, # target_folder, # list_to_generate.name.split("_")[1], # datasets_to_generate[i], # ) generate_data_split( train_json_path, train_signal_folder, target_folder, list_to_generate.name.split("_")[1], dataset, if_msbg=True, ) generate_data_split( train_json_path, train_signal_folder, target_folder, list_to_generate.name.split("_")[1], dataset, if_ref=True, )
[docs] def run_signal_generation_test(cfg, track): test_json_path = Path(cfg.path.cpc1_test_data) / f"metadata/CPC1.test{track}.json" test_signal_folder = ( Path(cfg.path.cpc1_test_data) / f"clarity_data/HA_outputs/test{track}/" ) target_folder = Path(cfg.path.exp_folder) / f"cpc1_asr_data{track}" # generate_data_split( # test_json_path, test_signal_folder, target_folder, "test", [], # ) generate_data_split( test_json_path, test_signal_folder, target_folder, "test", [], if_msbg=True ) generate_data_split( test_json_path, test_signal_folder, target_folder, "test", [], if_ref=True )
[docs] @hydra.main(config_path=".", config_name="config", version_base=None) def run(cfg: DictConfig) -> None: if cfg.cpc1_track == "open": track = "_indep" elif cfg.cpc1_track == "closed": track = "" else: logger.error("cpc1_track has to be closed or open") raise ValueError("cpc1_track has to be closed or open") logger.info("Split all training data into train set and dev set.") run_data_split(cfg, track) logger.info("Running MSGB simulation.") run_msbg_simulation(cfg, track) logger.info("Begin generating signals for ASR for cpc1_train_data.") run_signal_generation_train(cfg, track) logger.info("Begin generating signals for ASR for cpc1_test_data.") run_signal_generation_test(cfg, track)
# pylint: disable=no-value-for-parameter if __name__ == "__main__": run()