""" generate_signal.py This file is part of the repo "optical-regeneration" https://git.suuppl.dev/seppl/optical-regeneration.git Joseph Hopfmüller Copyright 2024 Licensed under the EUPL Full license text in LICENSE file """ import configparser from datetime import datetime import hashlib from pathlib import Path import time from matplotlib import pyplot as plt # noqa: F401 import numpy as np import path_fix import pypho default_config = f""" [glova] nos = 256 sps = 256 f0 = 193414489032258.06 symbolrate = 10e9 wisdom_dir = "{str((Path.home() / ".pypho"))}" flags = "FFTW_PATIENT" nthreads = 32 [fiber] length = 80000 gamma = 1.14 alpha = 0.2 D = 17 S = 0 birefsteps = 1 ; birefseed = 0xC0FFEE [signal] ; seed = 0xC0FFEE modulation = "pam" mod_order = 4 mod_depth = 0.8 max_jitter = 0.02 ; jitter_seed = 0xC0FFEE laser_power = 0 edfa_power = 3 edfa_nf = 5 pulse_shape = "gauss" fwhm = 0.33 [data] dir = "data" npy_dir = "npys" """ def get_config(config_file=None): """ DANGER! The function uses eval() to parse the config file. Do not use this function with untrusted input. """ if config_file is None: config_file = Path(__file__).parent / "signal_generation.ini" if not config_file.exists(): with open(config_file, "w") as f: f.write(default_config) config = configparser.ConfigParser() config.read(config_file) conf = {} for section in config.sections(): # print(f"[{section}]") conf[section] = {} for key in config[section]: # print(f"{key} = {config[section][key]}") conf[section][key] = eval(config[section][key]) # if isinstance(conf[section][key], str): # conf[section][key] = config[section][key].strip('"') return conf class pam_generator: def __init__( self, glova, mod_order=None, mod_depth=0.5, pulse_shape="gauss", fwhm=0.33, seed=None, ) -> None: self.glova = glova self.pulse_shape = pulse_shape self.modulation_depth = mod_depth self.mod_order = mod_order self.fwhm = fwhm self.seed = seed def __call__(self, E, symbols, max_jitter=0): symbols_x = symbols[0] / (self.mod_order or np.max(symbols[0])) symbols_y = symbols[1] / (self.mod_order or np.max(symbols[1])) diffs_x = np.diff(symbols_x, prepend=symbols_x[0]) diffs_y = np.diff(symbols_y, prepend=symbols_y[0]) max_jitter = int(round(max_jitter * self.glova.sps)) digital_x = self.generate_digital_signal(diffs_x, max_jitter) digital_y = self.generate_digital_signal(diffs_y, max_jitter) digital_x = np.pad( digital_x, (0, self.glova.sps // 2), "constant", constant_values=(0, 0) ) digital_y = np.pad( digital_y, (0, self.glova.sps // 2), "constant", constant_values=(0, 0) ) if self.pulse_shape == "gauss": wavelet = self.gauss(oversampling=6) else: raise ValueError(f"Unknown pulse shape: {self.pulse_shape}") # create analog signal of diff of symbols E_x = np.convolve(digital_x, wavelet) E_y = np.convolve(digital_y, wavelet) # convert to pam and set modulation depth (scale and move up such that 1 stays at 1) E_x = np.cumsum(E_x) * self.modulation_depth + (1 - self.modulation_depth) E_y = np.cumsum(E_y) * self.modulation_depth + (1 - self.modulation_depth) # cut off the wavelet tails E_x = E_x[self.glova.sps // 2 + len(wavelet) // 2 - 1 : -len(wavelet) // 2] E_y = E_y[self.glova.sps // 2 + len(wavelet) // 2 - 1 : -len(wavelet) // 2] # modulate the laser E[0]["E"][0] = np.sqrt(np.multiply(np.square(E[0]["E"][0]), E_x)) E[0]["E"][1] = np.sqrt(np.multiply(np.square(E[0]["E"][1]), E_y)) return E def generate_digital_signal(self, symbols, max_jitter=0): rs = np.random.RandomState(self.seed) signal = np.zeros(self.glova.nos * self.glova.sps) for index in range(self.glova.nos): jitter = max_jitter != 0 and rs.randint(-max_jitter, max_jitter) signal_index = index * self.glova.sps + jitter if signal_index < 0: continue if signal_index >= len(signal): continue signal[signal_index] = symbols[index] return signal def gauss(self, oversampling=1): sample_points = np.linspace( -oversampling * self.glova.sps, oversampling * self.glova.sps, oversampling * 2 * self.glova.sps, endpoint=True, ) sigma = self.fwhm / (1 * np.sqrt(2 * np.log(2))) * self.glova.sps pulse = ( 1 / (sigma * np.sqrt(2 * np.pi)) * np.exp(-np.square(sample_points) / (2 * np.square(sigma))) ) return pulse def initialize_fiber_and_data(config, input_data_override=None): py_glova = pypho.setup( nos=config["glova"]["nos"], sps=config["glova"]["sps"], f0=config["glova"]["f0"], symbolrate=config["glova"]["symbolrate"], wisdom_dir=config["glova"]["wisdom_dir"], flags=config["glova"]["flags"], nthreads=config["glova"]["nthreads"], ) c_glova = pypho.cfiber.GlovaWrapper.from_setup(py_glova) c_data = pypho.cfiber.DataWrapper(py_glova.sps * py_glova.nos) py_edfa = pypho.oamp(py_glova, Pmean=config["signal"]["edfa_power"], NF=config["signal"]["edfa_nf"]) if input_data_override is not None: c_data.E_in = input_data_override[0] noise = input_data_override[1] else: config["signal"]["seed"] = config["signal"].get( "seed", (int(time.time() * 1000)) % 2**32 ) config["signal"]["jitter_seed"] = config["signal"].get( "jitter_seed", (int(time.time() * 1000)) % 2**32 ) symbolsrc = pypho.symbols( py_glova, py_glova.nos, pattern="ones", seed=config["signal"]["seed"] ) laser = pypho.lasmod( py_glova, power=config["signal"]["laser_power"]+1.5, Df=0, theta=np.pi / 4 ) modulator = pam_generator( py_glova, mod_depth=config["signal"]["mod_depth"], pulse_shape=config["signal"]["pulse_shape"], fwhm=config["signal"]["fwhm"], seed=config["signal"]["jitter_seed"], ) symbols_x = symbolsrc(pattern="random", p1=config["signal"]["mod_order"]) symbols_y = symbolsrc(pattern="random", p1=config["signal"]["mod_order"]) symbols_x[:3] = 0 symbols_y[:3] = 0 cw = laser() source_signal = modulator(E=cw, symbols=(symbols_x, symbols_y)) source_signal = py_edfa(E=source_signal) c_data.E_in = source_signal[0]["E"] noise = source_signal[0]["noise"] py_fiber = pypho.fiber( glova=py_glova, l=config["fiber"]["length"], alpha=pypho.functions.dB_to_Neper(config["fiber"]["alpha"]) / 1000, gamma=config["fiber"]["gamma"], D=config["fiber"]["d"], S=config["fiber"]["s"], ) if config["fiber"]["birefsteps"] > 0: config["fiber"]["birefseed"] = config["fiber"].get( "birefseed", (int(time.time() * 1000)) % 2**32 ) py_fiber.birefarray = pypho.birefringence_segment.create_pmd_fibre( py_fiber.l, py_fiber.l / config["fiber"]["birefsteps"], 0, config["fiber"]["birefseed"], ) c_params = pypho.cfiber.ParamsWrapper.from_fiber( py_fiber, max_step=py_fiber.l if py_fiber.gamma == 0 else 200 ) c_fiber = pypho.cfiber.FiberWrapper(c_data, c_params, c_glova) return c_fiber, c_data, noise, py_edfa def save_data(data, config): data_dir = Path(config["data"]["dir"]) npy_dir = config["data"].get("npy_dir", "") save_dir = data_dir / npy_dir if len(npy_dir) else data_dir save_dir.mkdir(parents=True, exist_ok=True) save_data = np.column_stack([ data.E_in[0], data.E_in[1], data.E_out[0], data.E_out[1], ]) timestamp = datetime.now() seed = config["signal"].get("seed", False) jitter_seed = config["signal"].get("jitter_seed", False) birefseed = config["fiber"].get("birefseed", False) config_content = "\n".join(( f"; Generated by {str(Path(__file__).name)} @ {timestamp.strftime('%Y-%m-%d %H:%M:%S')}", "[glova]", f"sps = {config['glova']['sps']}", f"nos = {config['glova']['nos']}", f"f0 = {config['glova']['f0']}", f"symbolrate = {config['glova']['symbolrate']}", f'wisdom_dir = "{config["glova"]["wisdom_dir"]}"', f'flags = "{config["glova"]["flags"]}"', f"nthreads = {config['glova']['nthreads']}", " ", "[fiber]", f"length = {config['fiber']['length']}", f"gamma = {config['fiber']['gamma']}", f"alpha = {config['fiber']['alpha']}", f"D = {config['fiber']['d']}", f"S = {config['fiber']['s']}", f"birefsteps = {config['fiber']['birefsteps']}", f"birefseed = {hex(birefseed)}" if birefseed else "; birefseed = not set", "", "[signal]", f"seed = {hex(seed)}" if seed else "; seed = not set", "", f'modulation = "{config["signal"]["modulation"]}"', f"mod_order = {config['signal']['mod_order']}", f"mod_depth = {config['signal']['mod_depth']}", "" f"max_jitter = {config['signal']['max_jitter']}", f"jitter_seed = {hex(jitter_seed)}" if jitter_seed else "; jitter_seed = not set", "" f"laser_power = {config['signal']['laser_power']}", f"edfa_power = {config['signal']['edfa_power']}", f"edfa_nf = {config['signal']['edfa_nf']}", "" f'pulse_shape = "{config["signal"]["pulse_shape"]}"', f"fwhm = {config['signal']['fwhm']}", "", "[data]", f'dir = "{str(data_dir)}"', f'npy_dir = "{npy_dir}"', "file = " )) config_hash = hashlib.md5(config_content.encode()).hexdigest() save_file = f"{config_hash}.npy" config_content += f'"{str(save_file)}"\n' filename_components = ( timestamp.strftime("%Y%m%d-%H%M%S"), config["glova"]["sps"], config["glova"]["nos"], config["fiber"]["length"], config["fiber"]["gamma"], config["fiber"]["alpha"], config["fiber"]["d"], config["fiber"]["s"], f"{config['signal']['modulation'].upper()}{config['signal']['mod_order']}", config["fiber"]["birefsteps"], ) lookup_file = "-".join(map(str, filename_components)) + ".ini" with open(data_dir / lookup_file, "w") as f: f.write(config_content) np.save(save_dir / save_file, save_data) print("Saved config to", data_dir / lookup_file) print("Saved data to", save_dir / save_file) def length_loop(config, lengths, incremental=False): lengths = sorted(lengths) input_override = None for lind, length in enumerate(lengths): # print(f"\nGenerating data for fiber length {length}") if lind > 0 and incremental: # set the length to the difference between the current and previous length -> incremental length = lengths[lind] - lengths[lind - 1] if incremental: print( f"\nGenerating data for fiber length {lengths[lind]}m [using {length}m increment]" ) else: print(f"\nGenerating data for fiber length {length}m") config["fiber"]["length"] = length # set the input data to the output data of the previous run cfiber, cdata, noise, edfa = initialize_fiber_and_data( config, input_data_override=input_override ) if lind == 0: cdata_orig = cdata mean_power_in = np.sum(pypho.functions.getpower_W(cdata.E_in)) print( f"Mean power in: {mean_power_in:.3e} W ({pypho.functions.W_to_dBm(mean_power_in):.3e} dBm)" ) cfiber() mean_power_out = np.sum(pypho.functions.getpower_W(cdata.E_out)) print( f"Mean power out: {mean_power_out:.3e} W ({pypho.functions.W_to_dBm(mean_power_out):.3e} dBm)" ) input_override = (cdata.E_out, noise) cdata.E_in = cdata_orig.E_in config["fiber"]["length"] = lengths[lind] E_tmp = [{'E': cdata.E_out, 'noise': noise*(-cfiber.params.l*cfiber.params.alpha)}] E_tmp = edfa(E=E_tmp) cdata.E_out = E_tmp[0]['E'] save_data(cdata, config) in_out_eyes(cfiber, cdata) def single_run_with_plot(config, save=True): cfiber, cdata, noise, edfa = initialize_fiber_and_data(config) mean_power_in = np.sum(pypho.functions.getpower_W(cdata.E_in)) print( f"Mean power in: {mean_power_in:.3e} W ({pypho.functions.W_to_dBm(mean_power_in):.3e} dBm)" ) cfiber() mean_power_out = np.sum(pypho.functions.getpower_W(cdata.E_out)) print( f"Mean power out: {mean_power_out:.3e} W ({pypho.functions.W_to_dBm(mean_power_out):.3e} dBm)" ) E_tmp = [{'E': cdata.E_out, 'noise': noise*(-cfiber.params.l*cfiber.params.alpha)}] E_tmp = edfa(E=E_tmp) cdata.E_out = E_tmp[0]['E'] if save: save_data(cdata, config) in_out_eyes(cfiber, cdata) def in_out_eyes(cfiber, cdata): fig, axs = plt.subplots(2, 2, sharex=True, sharey=True) eye_head = min(cfiber.glova.nos, 2000) symbolrate_scale = 1e12 amplitude_scale = 1e3 plot_eye_diagram( amplitude_scale * np.abs(cdata.E_in[0]) ** 2, 2 * cfiber.glova.sps, normalize=False, samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale, head=eye_head, ax=axs[0][0], show=False, ) plot_eye_diagram( amplitude_scale * np.abs(cdata.E_out[0]) ** 2, 2 * cfiber.glova.sps, normalize=False, samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale, head=eye_head, ax=axs[0][1], color="C1", show=False, ) plot_eye_diagram( amplitude_scale * np.abs(cdata.E_in[1]) ** 2, 2 * cfiber.glova.sps, normalize=False, samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale, head=eye_head, ax=axs[1][0], show=False, ) plot_eye_diagram( amplitude_scale * np.abs(cdata.E_out[1]) ** 2, 2 * cfiber.glova.sps, normalize=False, samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale, head=eye_head, ax=axs[1][1], color="C1", show=False, ) title_map = [ ["Input x", "Output x"], ["Input y", "Output y"], ] title_map = np.array(title_map) for ax, title in zip(axs.flatten(), title_map.flatten()): ax.grid(True) ax.set_xlabel("Time [ps]") ax.set_ylabel("Power [mW]") ax.set_title(title) fig.tight_layout() plt.show() def plot_eye_diagram( signal: np.ndarray, eye_width, offset=0, *, head=None, samplerate=1, normalize=True, ax=None, color="C0", show=True, ): ax = ax or plt.gca() if head is not None: signal = signal[: head * eye_width] if normalize: signal = signal / np.max(signal) slices = np.lib.stride_tricks.sliding_window_view(signal, eye_width + 1)[ offset % (eye_width + 1) :: eye_width ] plt_ax = np.arange(-eye_width // 2, eye_width // 2 + 1) / samplerate for slice in slices: ax.plot(plt_ax, slice, color=color, alpha=0.1) ax.grid() if show: plt.show() if __name__ == "__main__": path_fix.show_log() config = get_config() length_ranges = [1000, 10000] length_scales = [1, 2, 3, 4, 5, 6, 7, 8, 9] lengths = [ length_scale * length_range for length_range in length_ranges for length_scale in length_scales ] lengths.append(max(length_ranges)*10) # length_loop(config, lengths) single_run_with_plot(config, save=False)