signal generation working

This commit is contained in:
Joseph Hopfmüller
2024-11-15 16:46:09 +01:00
parent 7ac4e48c20
commit 0564969a50
2 changed files with 323 additions and 139 deletions

View File

@@ -3,24 +3,20 @@ from datetime import datetime
import hashlib
from pathlib import Path
import time
from matplotlib import pyplot as plt
import scipy
from matplotlib import pyplot as plt # noqa: F401
import numpy as np
from rich import inspect
import _path_fix # noqa: F401
import pypho
import io
# import inspect
default_config = f"""
[glova]
sps = 256
nos = 256
sps = 256
f0 = 193414489032258.06
symbolrate = 10e9
wisdom_dir = "{str((Path.home()/".pypho"))}"
wisdom_dir = "{str((Path.home() / ".pypho"))}"
flags = "FFTW_PATIENT"
nthreads = 32
@@ -34,19 +30,26 @@ birefsteps = 1
; birefseed = 0xC0FFEE
[signal]
; seed = 0xC0FFEE
modulation = "pam"
mod_order = 4
mod_depth = 0.5
laser_power = 0
; seed = 0xC0FFEE
pulse_shape = "gauss_rz"
fwhm = 0.33
mod_depth = 0.8
max_jitter = 0.02
; jitter_seed = 0xC0FFEE
[script]
data_dir = "{str((Path.home()/".pypho"/"data"))}"
"""
laser_power = 0
edfa_power = 3
edfa_nf = 5
pulse_shape = "gauss"
fwhm = 0.33
[data]
dir = "data"
npy_dir = "npys"
"""
def get_config(config_file=None):
@@ -72,8 +75,17 @@ def get_config(config_file=None):
# conf[section][key] = config[section][key].strip('"')
return conf
class pam_generator:
def __init__(self, glova, mod_order=None, mod_depth=0.5, pulse_shape='gauss', fwhm=0.33, seed=None) -> None:
def __init__(
self,
glova,
mod_order=None,
mod_depth=0.5,
pulse_shape="gauss",
fwhm=0.33,
seed=None,
) -> None:
self.glova = glova
self.pulse_shape = pulse_shape
self.modulation_depth = mod_depth
@@ -82,21 +94,24 @@ class pam_generator:
self.seed = seed
def __call__(self, E, symbols, max_jitter=0):
symbols_x = symbols[0]/(self.mod_order or np.max(symbols[0]))
symbols_y = symbols[1]/(self.mod_order or np.max(symbols[1]))
symbols_x = symbols[0] / (self.mod_order or np.max(symbols[0]))
symbols_y = symbols[1] / (self.mod_order or np.max(symbols[1]))
diffs_x = np.diff(symbols_x, prepend=symbols_x[0])
diffs_y = np.diff(symbols_y, prepend=symbols_y[0])
max_jitter = int(round(max_jitter*self.glova.sps))
max_jitter = int(round(max_jitter * self.glova.sps))
digital_x = self.generate_digital_signal(diffs_x, max_jitter)
digital_y = self.generate_digital_signal(diffs_y, max_jitter)
digital_x = np.pad(digital_x, (0, self.glova.sps//2), 'constant', constant_values=(0,0))
digital_y = np.pad(digital_y, (0, self.glova.sps//2), 'constant', constant_values=(0,0))
digital_x = np.pad(
digital_x, (0, self.glova.sps // 2), "constant", constant_values=(0, 0)
)
digital_y = np.pad(
digital_y, (0, self.glova.sps // 2), "constant", constant_values=(0, 0)
)
if self.pulse_shape == 'gauss':
if self.pulse_shape == "gauss":
wavelet = self.gauss(oversampling=6)
else:
raise ValueError(f"Unknown pulse shape: {self.pulse_shape}")
@@ -106,108 +121,154 @@ class pam_generator:
E_y = np.convolve(digital_y, wavelet)
# convert to pam and set modulation depth (scale and move up such that 1 stays at 1)
E_x = np.cumsum(E_x)*self.modulation_depth + (1-self.modulation_depth)
E_y = np.cumsum(E_y)*self.modulation_depth + (1-self.modulation_depth)
E_x = np.cumsum(E_x) * self.modulation_depth + (1 - self.modulation_depth)
E_y = np.cumsum(E_y) * self.modulation_depth + (1 - self.modulation_depth)
# cut off the wavelet tails
E_x = E_x[self.glova.sps//2+len(wavelet)//2-1:-len(wavelet)//2]
E_y = E_y[self.glova.sps//2+len(wavelet)//2-1:-len(wavelet)//2]
E_x = E_x[self.glova.sps // 2 + len(wavelet) // 2 - 1 : -len(wavelet) // 2]
E_y = E_y[self.glova.sps // 2 + len(wavelet) // 2 - 1 : -len(wavelet) // 2]
# modulate the laser
np.multiply(E[0]['E'][0],E_x, out=E[0]['E'][0])
np.multiply(E[0]['E'][1],E_y, out=E[0]['E'][1])
E[0]["E"][0] = np.sqrt(np.multiply(np.square(E[0]["E"][0]), E_x))
E[0]["E"][1] = np.sqrt(np.multiply(np.square(E[0]["E"][1]), E_y))
return E
def generate_digital_signal(self, symbols, max_jitter=0):
rs = np.random.RandomState(self.seed)
signal = np.zeros(self.glova.nos*self.glova.sps)
signal = np.zeros(self.glova.nos * self.glova.sps)
for index in range(self.glova.nos):
jitter = max_jitter != 0 and rs.randint(-max_jitter, max_jitter)
signal_index = index*self.glova.sps + jitter
signal_index = index * self.glova.sps + jitter
if signal_index < 0:
continue
if signal_index >= len(signal):
continue
signal[signal_index] = symbols[index]
return signal
def gauss(self, oversampling=1):
sample_points = np.linspace(-oversampling*self.glova.sps, oversampling*self.glova.sps, oversampling*2*self.glova.sps, endpoint=True)
sigma = self.fwhm/(1*np.sqrt(2*np.log(2)))*self.glova.sps
pulse = 1/(sigma*np.sqrt(2*np.pi))*np.exp(-np.square(sample_points)/(2*np.square(sigma)))
sample_points = np.linspace(
-oversampling * self.glova.sps,
oversampling * self.glova.sps,
oversampling * 2 * self.glova.sps,
endpoint=True,
)
sigma = self.fwhm / (1 * np.sqrt(2 * np.log(2))) * self.glova.sps
pulse = (
1
/ (sigma * np.sqrt(2 * np.pi))
* np.exp(-np.square(sample_points) / (2 * np.square(sigma)))
)
return pulse
def initialize_fiber_and_data(config, input_data_override=None):
py_glova = pypho.setup(
nos = config['glova']['nos'],
sps = config['glova']['sps'],
f0 = config['glova']['f0'],
symbolrate=config['glova']['symbolrate'],
wisdom_dir = config['glova']['wisdom_dir'],
flags = config['glova']['flags'],
nthreads = config['glova']['nthreads'],
nos=config["glova"]["nos"],
sps=config["glova"]["sps"],
f0=config["glova"]["f0"],
symbolrate=config["glova"]["symbolrate"],
wisdom_dir=config["glova"]["wisdom_dir"],
flags=config["glova"]["flags"],
nthreads=config["glova"]["nthreads"],
)
c_glova = pypho.cfiber.GlovaWrapper.from_setup(py_glova)
c_data = pypho.cfiber.DataWrapper(py_glova.sps * py_glova.nos)
py_edfa = pypho.oamp(py_glova, Pmean=config["signal"]["edfa_power"], NF=config["signal"]["edfa_nf"])
if input_data_override is not None:
c_data.E_in = input_data_override
c_data.E_in = input_data_override[0]
noise = input_data_override[1]
else:
config['signal']['seed'] = config['signal'].get('seed',(int(time.time()*1000))%2**32)
config['signal']['jitter_seed'] = config['signal'].get('jitter_seed', (int(time.time()*1000))%2**32)
symbolsrc = pypho.symbols(py_glova, py_glova.nos, pattern='ones', seed=config['signal']['seed'])
laser = pypho.lasmod(py_glova, power=config['signal']['laser_power'], Df=0, theta=np.pi/4)
modulator = pam_generator(py_glova, mod_depth=config['signal']['mod_depth'], pulse_shape=config['signal']['pulse_shape'], fwhm=config['signal']['fwhm'], seed=config['signal']['jitter_seed'])
config["signal"]["seed"] = config["signal"].get(
"seed", (int(time.time() * 1000)) % 2**32
)
config["signal"]["jitter_seed"] = config["signal"].get(
"jitter_seed", (int(time.time() * 1000)) % 2**32
)
symbolsrc = pypho.symbols(
py_glova, py_glova.nos, pattern="ones", seed=config["signal"]["seed"]
)
laser = pypho.lasmod(
py_glova, power=config["signal"]["laser_power"]+1.5, Df=0, theta=np.pi / 4
)
modulator = pam_generator(
py_glova,
mod_depth=config["signal"]["mod_depth"],
pulse_shape=config["signal"]["pulse_shape"],
fwhm=config["signal"]["fwhm"],
seed=config["signal"]["jitter_seed"],
)
symbols_x = symbolsrc(pattern='random', p1=config['signal']['mod_order'])
symbols_y = symbolsrc(pattern='random', p1=config['signal']['mod_order'])
symbols_x = symbolsrc(pattern="random", p1=config["signal"]["mod_order"])
symbols_y = symbolsrc(pattern="random", p1=config["signal"]["mod_order"])
symbols_x[:3] = 0
symbols_y[:3] = 0
cw = laser()
cw[0]['E']*=np.sqrt(2)
source_signal = modulator(E=cw, symbols=(symbols_x, symbols_y))
source_signal = py_edfa(E=source_signal)
c_data.E_in = source_signal[0]['E']
c_data.E_in = source_signal[0]["E"]
noise = source_signal[0]["noise"]
py_fiber = pypho.fiber(
glova = py_glova,
l=config['fiber']['length'],
alpha=pypho.functions.dB_to_Neper(config['fiber']['alpha'])/1000,
gamma=config['fiber']['gamma'],
D=config['fiber']['d'],
S=config['fiber']['s'],
glova=py_glova,
l=config["fiber"]["length"],
alpha=pypho.functions.dB_to_Neper(config["fiber"]["alpha"]) / 1000,
gamma=config["fiber"]["gamma"],
D=config["fiber"]["d"],
S=config["fiber"]["s"],
)
if config["fiber"]["birefsteps"] > 0:
config["fiber"]["birefseed"] = config["fiber"].get(
"birefseed", (int(time.time() * 1000)) % 2**32
)
py_fiber.birefarray = pypho.birefringence_segment.create_pmd_fibre(
py_fiber.l,
py_fiber.l / config["fiber"]["birefsteps"],
0,
config["fiber"]["birefseed"],
)
c_params = pypho.cfiber.ParamsWrapper.from_fiber(
py_fiber, max_step=1e3 if py_fiber.gamma == 0 else 200
)
if config['fiber']['birefsteps'] > 0:
config['fiber']['birefseed'] = config['fiber'].get('birefseed', (int(time.time()*1000))%2**32)
py_fiber.birefarray = pypho.birefringence_segment.create_pmd_fibre(py_fiber.l, py_fiber.l/config['fiber']['birefsteps'], 0, config['fiber']['birefseed'])
c_params = pypho.cfiber.ParamsWrapper.from_fiber(py_fiber, max_step=1e3)
c_fiber = pypho.cfiber.FiberWrapper(c_data, c_params, c_glova)
return c_fiber, c_data
return c_fiber, c_data, noise, py_edfa
def save_data(data, config):
data_dir = Path(config['script']['data_dir'])
save_dir = data_dir
data_dir = Path(config["data"]["dir"])
npy_dir = config["data"].get("npy_dir", "")
save_dir = data_dir / npy_dir if len(npy_dir) else data_dir
save_dir.mkdir(parents=True, exist_ok=True)
save_data = np.column_stack([data.E_in[0], data.E_in[1], data.E_out[0], data.E_out[1]])
save_data = np.column_stack([
data.E_in[0],
data.E_in[1],
data.E_out[0],
data.E_out[1],
])
timestamp = datetime.now()
config_content = '\n'.join((
f"; Generated by {str(Path(__file__).name)} @ {timestamp.strftime("%Y-%m-%d %H:%M:%S")}",
seed = config["signal"].get("seed", False)
jitter_seed = config["signal"].get("jitter_seed", False)
birefseed = config["fiber"].get("birefseed", False)
config_content = "\n".join((
f"; Generated by {str(Path(__file__).name)} @ {timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
"[glova]",
f"sps = {config['glova']['sps']}",
f"nos = {config['glova']['nos']}",
f"f0 = {config['glova']['f0']}",
f"symbolrate = {config['glova']['symbolrate']}",
f'wisdom_dir = "{config['glova']['wisdom_dir']}"',
f'wisdom_dir = "{config["glova"]["wisdom_dir"]}"',
f'flags = "{config["glova"]["flags"]}"',
f"nthreads = {config['glova']['nthreads']}",
"",
" ",
"[fiber]",
f"length = {config['fiber']['length']}",
f"gamma = {config['fiber']['gamma']}",
@@ -215,34 +276,43 @@ def save_data(data, config):
f"D = {config['fiber']['d']}",
f"S = {config['fiber']['s']}",
f"birefsteps = {config['fiber']['birefsteps']}",
f"birefseed = {config['fiber'].get('birefseed', 'not set')}",
f"birefseed = {hex(birefseed)}" if birefseed else "; birefseed = not set",
"",
"[signal]",
f'modulation = "{config['signal']['modulation']}"',
f"seed = {hex(seed)}" if seed else "; seed = not set",
"",
f'modulation = "{config["signal"]["modulation"]}"',
f"mod_order = {config['signal']['mod_order']}",
f"mod_depth = {config['signal']['mod_depth']}",
f"seed = {config['signal'].get('seed', 'not set')}",
""
f"max_jitter = {config['signal']['max_jitter']}",
f"jitter_seed = {hex(jitter_seed)}" if jitter_seed else "; jitter_seed = not set",
""
f"laser_power = {config['signal']['laser_power']}",
f"edfa_power = {config['signal']['edfa_power']}",
f"edfa_nf = {config['signal']['edfa_nf']}",
""
f'pulse_shape = "{config["signal"]["pulse_shape"]}"',
f"fwhm = {config['signal']['fwhm']}",
f"max_jitter = {config['signal']['max_jitter']}",
f"jitter_seed = {config['signal'].get('jitter_seed', 'not set')}",
"",
"[script]",
f'data_dir = "{config["script"]["data_dir"]}"',
"[data]",
f'dir = "{str(data_dir)}"',
f'npy_dir = "{npy_dir}"',
"file = "
))
config_hash = hashlib.md5(config_content.encode()).hexdigest()
save_file = f"npys/{config_hash}.npy"
config_content += f'\n\n[DATA]\nfile = "{str(save_file)}\n"'
save_file = f"{config_hash}.npy"
config_content += f'"{str(save_file)}"\n'
filename_components = (
timestamp.strftime("%Y%m%d-%H%M%S"),
config['glova']['sps'],
config['glova']['nos'],
config['fiber']['length'],
config['fiber']['gamma'],
config['fiber']['alpha'],
config['fiber']['d'],
config['fiber']['s'],
config["glova"]["sps"],
config["glova"]["nos"],
config["fiber"]["length"],
config["fiber"]["gamma"],
config["fiber"]["alpha"],
config["fiber"]["d"],
config["fiber"]["s"],
f"{config['signal']['modulation'].upper()}{config['signal']['mod_order']}",
)
@@ -255,65 +325,174 @@ def save_data(data, config):
print("Saved config to", data_dir / lookup_file)
print("Saved data to", save_dir / f"{config_hash}.npy")
def length_loop(config):
# loop over lengths
# lengths = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 40000, 50000, 60000, 70000, 80000, 90000, 100000]
ranges = [1, 10, 100, 1000, 10000, 100000]
lengths = [list(range(length_range,10*length_range, length_range)) for length_range in ranges]
lengths = [length for length_range in lengths for length in length_range]
def length_loop(config, lengths):
lengths = sorted(lengths)
input_override = None
for lind, length in enumerate(lengths):
# print(f"\nGenerating data for fiber length {length}")
if lind > 0:
# set the length to the difference between the current and previous length -> incremental
length = lengths[lind] - lengths[lind-1]
print(f"\nGenerating data for fiber length {lengths[lind]}m -> {length}m")
config['fiber']['length'] = length
length = lengths[lind] - lengths[lind - 1]
print(
f"\nGenerating data for fiber length {lengths[lind]}m [using {length}m increment]"
)
config["fiber"]["length"] = length
# set the input data to the output data of the previous run
cfiber, cdata = initialize_fiber_and_data(config, input_data_override=input_override)
cfiber, cdata, noise, edfa = initialize_fiber_and_data(
config, input_data_override=input_override
)
if lind == 0:
cdata_orig = cdata
energy_in = np.sum(np.abs(cdata.E_in[0])**2 + np.abs(cdata.E_in[1])**2)/(cfiber.glova.symbolrate*cfiber.glova.sps)
print(f"Energy in: {energy_in} J")
mean_power_in = np.sum(pypho.functions.getpower_W(cdata.E_in))
print(
f"Mean power in: {mean_power_in:.3e} W ({pypho.functions.W_to_dBm(mean_power_in):.3e} dBm)"
)
cfiber()
energy_out = np.sum(np.abs(cdata.E_out[0])**2 + np.abs(cdata.E_out[1])**2)/(cfiber.glova.symbolrate*cfiber.glova.sps)
print(f"Energy out: {energy_out} J")
input_override = cdata.E_out
mean_power_out = np.sum(pypho.functions.getpower_W(cdata.E_out))
print(
f"Mean power out: {mean_power_out:.3e} W ({pypho.functions.W_to_dBm(mean_power_out):.3e} dBm)"
)
input_override = (cdata.E_out, noise)
cdata.E_in = cdata_orig.E_in
config['fiber']['length'] = lengths[lind]
config["fiber"]["length"] = lengths[lind]
E_tmp = [{'E': cdata.E_out, 'noise': noise*(-cfiber.params.l*cfiber.params.alpha)}]
E_tmp = edfa(E=E_tmp)
cdata.E_out = E_tmp[0]['E']
save_data(cdata, config)
in_out_eyes(cfiber, cdata)
def single_run_with_plot(config):
cfiber, cdata, noise, edfa = initialize_fiber_and_data(config)
mean_power_in = np.sum(pypho.functions.getpower_W(cdata.E_in))
print(
f"Mean power in: {mean_power_in:.3e} W ({pypho.functions.W_to_dBm(mean_power_in):.3e} dBm)"
)
cfiber()
mean_power_out = np.sum(pypho.functions.getpower_W(cdata.E_out))
print(
f"Mean power out: {mean_power_out:.3e} W ({pypho.functions.W_to_dBm(mean_power_out):.3e} dBm)"
)
E_tmp = [{'E': cdata.E_out, 'noise': noise*(-cfiber.params.l*cfiber.params.alpha)}]
E_tmp = edfa(E=E_tmp)
cdata.E_out = E_tmp[0]['E']
save_data(cdata, config)
in_out_eyes(cfiber, cdata)
def in_out_eyes(cfiber, cdata):
fig, axs = plt.subplots(2, 2, sharex=True, sharey=True)
eye_head = min(cfiber.glova.nos, 2000)
symbolrate_scale = 1e12
amplitude_scale = 1e3
plot_eye_diagram(
amplitude_scale * np.abs(cdata.E_in[0]) ** 2,
2 * cfiber.glova.sps,
normalize=False,
samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale,
head=eye_head,
ax=axs[0][0],
show=False,
)
plot_eye_diagram(
amplitude_scale * np.abs(cdata.E_out[0]) ** 2,
2 * cfiber.glova.sps,
normalize=False,
samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale,
head=eye_head,
ax=axs[0][1],
color="C1",
show=False,
)
plot_eye_diagram(
amplitude_scale * np.abs(cdata.E_in[1]) ** 2,
2 * cfiber.glova.sps,
normalize=False,
samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale,
head=eye_head,
ax=axs[1][0],
show=False,
)
plot_eye_diagram(
amplitude_scale * np.abs(cdata.E_out[1]) ** 2,
2 * cfiber.glova.sps,
normalize=False,
samplerate=cfiber.glova.symbolrate * cfiber.glova.sps / symbolrate_scale,
head=eye_head,
ax=axs[1][1],
color="C1",
show=False,
)
title_map = [
["Input x", "Output x"],
["Input y", "Output y"],
]
title_map = np.array(title_map)
for ax, title in zip(axs.flatten(), title_map.flatten()):
ax.grid(True)
ax.set_xlabel("Time [ps]")
ax.set_ylabel("Power [mW]")
ax.set_title(title)
fig.tight_layout()
plt.show()
def plot_eye_diagram(
signal: np.ndarray,
eye_width,
offset=0,
*,
head=None,
samplerate=1,
normalize=True,
ax=None,
color="C0",
show=True,
):
ax = ax or plt.gca()
if head is not None:
signal = signal[: head * eye_width]
if normalize:
signal = signal / np.max(signal)
slices = np.lib.stride_tricks.sliding_window_view(signal, eye_width + 1)[
offset % (eye_width + 1) :: eye_width
]
plt_ax = np.arange(-eye_width // 2, eye_width // 2 + 1) / samplerate
for slice in slices:
ax.plot(plt_ax, slice, color=color, alpha=0.1)
ax.grid()
if show:
plt.show()
if __name__ == "__main__":
config = get_config()
length_loop(config)
length_ranges = [1, 10, 100, 1000, 10000]
length_ranges = [1000, 10000]
length_scales = [1, 2, 5]
# cfiber, cdata = initialize_fiber_and_data(config)
# cfiber()
lengths = [
length_scale * length_range
for length_range in length_ranges
for length_scale in length_scales
]
lengths.append(max(length_ranges))
# energy_in = np.sum(np.abs(cdata.E_in[0])**2 + np.abs(cdata.E_in[1])**2)/(cfiber.glova.symbolrate*cfiber.glova.sps)
# energy_out = np.sum(np.abs(cdata.E_out[0])**2 + np.abs(cdata.E_out[1])**2)/(cfiber.glova.symbolrate*cfiber.glova.sps)
# print(f"Energy in: {energy_in} J")
# print(f"Energy out: {energy_out} J")
# save_data(cdata, config)
# length_loop(config, lengths)
# fig, axs = plt.subplots(2, 1, sharex=True, sharey=True)
# xax = np.linspace(0, cfiber.glova.nos, cfiber.glova.nos*cfiber.glova.sps)-0.5
# axs[0].plot(xax,np.abs(cdata.E_in[0])**2)
# axs[0].plot(xax,np.abs(cdata.E_out[0])**2)
# axs[0].set_title("x")
# axs[0].grid()
# axs[1].plot(xax,np.abs(cdata.E_in[1])**2)
# axs[1].plot(xax,np.abs(cdata.E_out[1])**2)
# axs[1].set_title("y")
# axs[1].grid()
# plt.show()
single_run_with_plot(config)