add eye_diagram analysis

This commit is contained in:
Joseph Hopfmüller
2024-12-02 18:48:43 +01:00
parent c145d58df8
commit e20aa9bfb1

View File

@@ -0,0 +1,418 @@
from matplotlib import pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import numpy as np
from scipy.cluster.vq import kmeans2
import warnings
from rich.traceback import install
from rich import pretty
from rich import print
install()
pretty.install()
def generate_sample_data(n_symbols=1000, sps=128, noise=0.01, skew=1):
data = create_symbol_sequence(n_symbols, skew=skew)
signal = generate_signal(data, sps)
signal = normalization_with_noise(signal, noise)
xaxis = np.arange(0, len(signal)) / sps
return np.vstack([xaxis, signal])
def create_symbol_sequence(n_symbols, skew=1):
np.random.seed(42)
data = np.random.randint(0, 4, n_symbols) / 4
data = np.pow(data, skew)
return tuple(data)
def generate_signal(data, sps):
working_data = np.diff(data, prepend=data[0])
data_padded = np.zeros(len(data) * sps)
data_padded[::sps] = working_data
data_padded = np.pad(data_padded, (0, sps // 2), mode="constant")
wavelet = generate_wavelet(sps, oversample=3)
signal = np.convolve(data_padded, wavelet)
signal = np.cumsum(signal)
signal = signal[sps // 2 + len(wavelet) // 2 - 1 : -len(wavelet) // 2]
return signal
def normalization_with_noise(signal, noise=0):
if noise > 0:
awgn = np.random.normal(0, noise * (np.max(signal) - np.min(signal)), len(signal))
signal += awgn
# min-max normalization
signal = signal - np.min(signal)
signal = signal / np.max(signal)
return signal
def generate_wavelet(sps, oversample=3):
sample_points = np.linspace(
-oversample * sps,
oversample * sps,
2 * oversample * sps,
endpoint=True,
)
sigma = 0.33 / (1 * np.sqrt(2 * np.log(2))) * sps
pulse = 1 / (sigma * np.sqrt(2 * np.pi)) * np.exp(-np.square(sample_points) / (2 * np.square(sigma)))
return pulse
class eye_diagram:
def __init__(self, data, *, channel_names=None, horizontal_bins=256, vertical_bins=1000, n_levels=4):
# data has shape [channels, 2, samples]
# each sample has a timestamp and a value
if data.ndim == 2:
data = data[np.newaxis, :, :]
self.channel_names = channel_names
self.raw_data = data
self.channels = data.shape[0]
self.n_levels = n_levels
self.eye_stats = [{"success": False} for _ in range(self.channels)]
self.horizontal_bins = horizontal_bins
self.vertical_bins = vertical_bins
self.eye_built = False
self.analyse(self.n_levels)
def generate_eye_data(self):
self.x_bins = np.linspace(0, 2, self.horizontal_bins, endpoint=False)
self.y_bins = np.zeros((self.channels, self.vertical_bins))
self.eye_data = np.zeros((self.channels, self.vertical_bins, self.horizontal_bins))
for i in range(self.channels):
data_min = np.min(self.raw_data[i, 1, :])
data_max = np.max(self.raw_data[i, 1, :])
self.y_bins[i] = np.linspace(data_min, data_max, self.vertical_bins, endpoint=False)
t_vals = self.raw_data[i, 0, :] % 2
val_vals = self.raw_data[i, 1, :]
x_indices = np.digitize(t_vals, self.x_bins) - 1
y_indices = np.digitize(val_vals, self.y_bins[i]) - 1
np.add.at(self.eye_data[i], (y_indices, x_indices), 1)
self.eye_built = True
def plot(self, title="Eye Diagram", stats=True, show=True):
if not self.eye_built:
self.generate_eye_data()
cmap = LinearSegmentedColormap.from_list(
"eyemap",
[(0, "white"), (0.1, "blue"), (0.2, "cyan"), (0.5, "green"), (0.8, "yellow"), (0.9, "red"), (1, "magenta")],
)
if self.channels % 2 == 0:
rows = 2
cols = self.channels // 2
else:
cols = int(np.ceil(np.sqrt(self.channels)))
rows = int(np.ceil(self.channels / cols))
fig, ax = plt.subplots(rows, cols, sharex=True, sharey=False)
fig.suptitle(title)
ax = np.atleast_1d(ax).transpose().flatten()
for i in range(self.channels):
ax[i].set_title(self.channel_names[i] if self.channel_names is not None else f"Channel {i+1}")
ax[i].set_xlabel("Symbol")
ax[i].set_ylabel("Amplitude")
ax[i].grid()
ax[i].imshow(
self.eye_data[i],
origin="lower",
aspect="auto",
cmap=cmap,
extent=[self.x_bins[0], self.x_bins[-1], self.y_bins[i][0], self.y_bins[i][-1]],
)
ax[i].set_xlim((self.x_bins[0], self.x_bins[-1]))
ymin = np.min(self.y_bins[:, 0])
ymax = np.max(self.y_bins[:, -1])
yspan = ymax - ymin
ax[i].set_ylim((ymin - 0.1 * yspan, ymax + 0.1 * yspan))
if stats and self.eye_stats[i]["success"]:
ax[i].plot([0, 2], [self.eye_stats[i]["levels"], self.eye_stats[i]["levels"]], "k--")
ax[i].set_yticks(self.eye_stats[i]["levels"])
# add arrows for amplitudes
for j in range(len(self.eye_stats[i]["amplitudes"])):
ax[i].annotate(
"",
xy=(0.05, self.eye_stats[i]["levels"][j]),
xytext=(0.05, self.eye_stats[i]["levels"][j + 1]),
arrowprops=dict(arrowstyle="<->", facecolor="black"),
)
ax[i].annotate(
f"{self.eye_stats[i]['amplitudes'][j]:.2e}",
xy=(0.06, (self.eye_stats[i]["levels"][j] + self.eye_stats[i]["levels"][j + 1]) / 2),
)
# add arrows for eye heights
for j in range(len(self.eye_stats[i]["heights"])):
try:
bot = np.max(self.eye_stats[i]["amplitude_clusters"][j])
top = np.min(self.eye_stats[i]["amplitude_clusters"][j + 1])
ax[i].annotate(
"",
xy=(self.eye_stats[i]["time_midpoint"], bot),
xytext=(self.eye_stats[i]["time_midpoint"], top),
arrowprops=dict(arrowstyle="<->", facecolor="black"),
)
ax[i].annotate(
f"{self.eye_stats[i]['heights'][j]:.2e}",
xy=(self.eye_stats[i]["time_midpoint"] + 0.015, (bot + top) / 2 + 0.04),
)
except (ValueError, IndexError):
pass
# add arrows for eye widths
for j in range(len(self.eye_stats[i]["widths"])):
try:
left = np.max(self.eye_stats[i]["time_clusters"][j][0])
right = np.min(self.eye_stats[i]["time_clusters"][j][1])
vertical = (self.eye_stats[i]["levels"][j] + self.eye_stats[i]["levels"][j + 1]) / 2
ax[i].annotate(
"",
xy=(left, vertical),
xytext=(right, vertical),
arrowprops=dict(arrowstyle="<->", facecolor="black"),
)
ax[i].annotate(
f"{self.eye_stats[i]['widths'][j]:.2e}",
xy=((left + right) / 2 - 0.15, vertical + 0.01),
)
except (ValueError, IndexError):
pass
# add area
for j in range(len(self.eye_stats[i]["areas"])):
horizontal = self.eye_stats[i]["time_midpoint"]
vertical = (self.eye_stats[i]["levels"][j] + self.eye_stats[i]["levels"][j + 1]) / 2
ax[i].annotate(
f"{self.eye_stats[i]['areas'][j]:.2e}",
xy=(horizontal + 0.035, vertical - 0.07),
)
# add min_area above the plot
ax[i].annotate(
f"Min Area: {self.eye_stats[i]['min_area']:.2e}",
xy=(0.05, ymax + 0.05 * yspan),
# xycoords="axes fraction",
ha="left",
va="center",
)
fig.tight_layout()
if show:
plt.show()
return fig
def analyse(self, n_levels=4):
warnings.filterwarnings("error")
for i in range(self.channels):
self.eye_stats[i]["channel"] = str(i+1) if self.channel_names is None else self.channel_names[i]
try:
approx_levels = eye_diagram.approximate_levels(self.raw_data[i], n_levels)
time_bounds = eye_diagram.calculate_time_bounds(self.raw_data[i], approx_levels)
self.eye_stats[i]["time_midpoint"] = (time_bounds[0] + time_bounds[1]) / 2
self.eye_stats[i]["levels"], self.eye_stats[i]["amplitude_clusters"] = eye_diagram.calculate_levels(
self.raw_data[i], approx_levels, time_bounds
)
self.eye_stats[i]["amplitudes"] = np.diff(self.eye_stats[i]["levels"])
self.eye_stats[i]["heights"] = eye_diagram.calculate_eye_heights(
self.eye_stats[i]["amplitude_clusters"]
)
self.eye_stats[i]["widths"], self.eye_stats[i]["time_clusters"] = eye_diagram.calculate_eye_widths(
self.raw_data[i], self.eye_stats[i]["levels"]
)
# # check if time clusters are valid (upper bound > time_midpoint > lower bound)
# # if not: raise ValueError
# for j in range(len(self.eye_stats[i]['time_clusters'])):
# if not (np.max(self.eye_stats[i]['time_clusters'][j][0]) < self.eye_stats[i]["time_midpoint"] < np.min(self.eye_stats[i]['time_clusters'][j][1])):
# raise ValueError
self.eye_stats[i]["areas"] = self.eye_stats[i]["heights"] * self.eye_stats[i]["widths"]
self.eye_stats[i]["mean_area"] = np.mean(self.eye_stats[i]["areas"])
self.eye_stats[i]["min_area"] = np.min(self.eye_stats[i]["areas"])
self.eye_stats[i]["success"] = True
except (RuntimeWarning, UserWarning, ValueError):
self.eye_stats[i]["success"] = False
self.eye_stats[i]["time_midpoint"] = 0
self.eye_stats[i]["levels"] = np.zeros(n_levels)
self.eye_stats[i]["amplitude_clusters"] = []
self.eye_stats[i]["amplitudes"] = np.zeros(n_levels - 1)
self.eye_stats[i]["heights"] = np.zeros(n_levels - 1)
self.eye_stats[i]["widths"] = np.zeros(n_levels - 1)
self.eye_stats[i]["areas"] = np.zeros(n_levels - 1)
self.eye_stats[i]["mean_area"] = 0
self.eye_stats[i]["min_area"] = 0
warnings.resetwarnings()
@staticmethod
def approximate_levels(data, levels):
amplitudes = data[1]
grouping_data = amplitudes.reshape(-1, 1)
kmeans, clusters = eye_diagram.kmeans_cluster(grouping_data, levels)
centroids = np.zeros(levels)
for i in range(levels):
centroids[i] = eye_diagram.shorth(clusters[i])
return np.sort(centroids)
@staticmethod
def kmeans_cluster(data, levels):
working_data = data.reshape(-1, 1)
# initial = np.linspace(np.min(working_data), np.max(working_data), levels).reshape(-1, 1)
kmeans = kmeans2(working_data, levels, iter=100, minit="++")
order = np.argsort(kmeans[0].squeeze())
kmeans[0][:] = kmeans[0][order]
order = np.argsort(order)
kmeans[1][:] = order[kmeans[1]]
clusters = [[] for _ in range(levels)]
for i, elem in enumerate(data):
clusters[kmeans[1][i]].append(elem.squeeze())
clusters = [np.array(cluster) for cluster in clusters]
# clusters = [clusters[i] for i in order]
return kmeans, clusters
@staticmethod
def shorth(data):
working_data = np.sort(data)
n = len(working_data)
h = n // 2 + 1
min_diff = np.inf
interval = np.zeros(2)
for i in range(n - h):
diff = working_data[i + h] - working_data[i]
if diff < min_diff:
min_diff = diff
interval = [working_data[i], working_data[i + h]]
return np.mean(interval)
@staticmethod
def calculate_time_bounds(data, level_centroids):
n_levels = 2
# prepare data
selection_range = eye_diagram.calc_selection_range(level_centroids[1:3], 0.01)
# times = np.arange(0, len(data), dtype=np.float32)
times, amplitudes = data
grouping_data = times[(amplitudes > selection_range[0]) & (amplitudes < selection_range[1])]
grouping_data = grouping_data % 2
grouping_data = grouping_data.reshape(-1, 1)
kmeans, clusters = eye_diagram.kmeans_cluster(grouping_data, n_levels)
# time_midpoint = (np.min(clusters[1]) + np.max(clusters[0]))/2
# # check if time clusters are valid (upper bound > time_midpoint > lower bound)
# # if not: raise ValueError
# if not (np.max(clusters[0]) < time_midpoint < np.min(clusters[1])):
# raise ValueError
return np.min(clusters[1]), np.max(clusters[0])
@staticmethod
def calc_selection_range(data, tolerance):
middle = np.mean(data)
tol = tolerance * np.abs(np.diff(data))
return (middle - tol, middle + tol)
@staticmethod
def calculate_levels(data, level_centroids, time_bounds):
selection_range = eye_diagram.calc_selection_range(time_bounds, 0.025)
times, amplitudes = data
indices = np.arange(0, len(times))
filtered_time = indices[((times % 2) > selection_range[0]) & ((times % 2) < selection_range[1])]
filtered_data = amplitudes[filtered_time]
vertical_bounds = np.array([
-np.inf,
*[(level_centroids[i] + level_centroids[i + 1]) / 2 for i in range(len(level_centroids) - 1)],
np.inf,
])
central_level_means = np.zeros(len(level_centroids))
amplitude_clusters = []
for i in range(len(level_centroids)):
amplitude_filtered_data = filtered_data[
(filtered_data > vertical_bounds[i]) & (filtered_data < vertical_bounds[i + 1])
]
amplitude_clusters.append(amplitude_filtered_data)
central_level_means[i] = np.mean(amplitude_filtered_data)
# # check if amplitude clusters are valid (upper bound > level_midpoint > lower bound)
# # if not: raise ValueError
# for j in range(len(amplitude_clusters)):
# level_midpoint = (central_level_means[j] + central_level_means[j+1]) / 2
# if not (np.max(amplitude_clusters[0]) < level_midpoint < np.min(amplitude_clusters[1])):
# raise ValueError
return central_level_means, amplitude_clusters
@staticmethod
def calculate_eye_heights(amplitude_clusters):
eye_heights = np.zeros(len(amplitude_clusters) - 1)
for i in range(len(amplitude_clusters) - 1):
eye_heights[i] = np.min(amplitude_clusters[i + 1]) - np.max(amplitude_clusters[i])
return eye_heights
@staticmethod
def calculate_eye_widths(data, central_level_means):
n_levels = len(central_level_means)
widths = np.zeros(n_levels - 1)
times, amplitudes = data
clusters = []
for i in range(n_levels - 1):
selection_range = eye_diagram.calc_selection_range(
[central_level_means[i], central_level_means[i + 1]], 0.01
)
grouping_data = times[(amplitudes > selection_range[0]) & (amplitudes < selection_range[1])]
grouping_data = grouping_data % 2
grouping_data = grouping_data.reshape(-1, 1)
kmeans, cluster = eye_diagram.kmeans_cluster(grouping_data, 2)
clusters.append(cluster)
widths[i] = np.min(cluster[1]) - np.max(cluster[0])
...
return widths, clusters
if __name__ == "__main__":
length = int(2**14)
# data = generate_sample_data(length, noise=1)
# data1 = generate_sample_data(length, noise=0.01)
# data2 = generate_sample_data(length, noise=0.01, skew=1.2)
# data3 = generate_sample_data(length, noise=0.02)
# data = np.stack([data, data1, data2, data3])
data = generate_sample_data(length, noise=0.005)
eye = eye_diagram(data, horizontal_bins=256, vertical_bins=256)
attrs = ("success", "amplitudes", "time_midpoint", "levels", "heights", "widths", "area", "mean_area", "min_area")
for i, channel in enumerate(eye.eye_stats):
print(f"Channel {i}")
print_data = {attr: channel[attr] for attr in attrs}
print(print_data)
eye.plot()