From e20aa9bfb1fed18c74bdada8860433627a2769be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joseph=20Hopfm=C3=BCller?= Date: Mon, 2 Dec 2024 18:48:43 +0100 Subject: [PATCH] add eye_diagram analysis --- src/single-core-regen/util/eye_diagram.py | 418 ++++++++++++++++++++++ 1 file changed, 418 insertions(+) create mode 100644 src/single-core-regen/util/eye_diagram.py diff --git a/src/single-core-regen/util/eye_diagram.py b/src/single-core-regen/util/eye_diagram.py new file mode 100644 index 0000000..b07047f --- /dev/null +++ b/src/single-core-regen/util/eye_diagram.py @@ -0,0 +1,418 @@ +from matplotlib import pyplot as plt +from matplotlib.colors import LinearSegmentedColormap +import numpy as np +from scipy.cluster.vq import kmeans2 +import warnings + +from rich.traceback import install +from rich import pretty +from rich import print + +install() +pretty.install() + + +def generate_sample_data(n_symbols=1000, sps=128, noise=0.01, skew=1): + data = create_symbol_sequence(n_symbols, skew=skew) + signal = generate_signal(data, sps) + signal = normalization_with_noise(signal, noise) + + xaxis = np.arange(0, len(signal)) / sps + return np.vstack([xaxis, signal]) + +def create_symbol_sequence(n_symbols, skew=1): + np.random.seed(42) + data = np.random.randint(0, 4, n_symbols) / 4 + data = np.pow(data, skew) + return tuple(data) + + +def generate_signal(data, sps): + working_data = np.diff(data, prepend=data[0]) + data_padded = np.zeros(len(data) * sps) + data_padded[::sps] = working_data + data_padded = np.pad(data_padded, (0, sps // 2), mode="constant") + + wavelet = generate_wavelet(sps, oversample=3) + + signal = np.convolve(data_padded, wavelet) + signal = np.cumsum(signal) + signal = signal[sps // 2 + len(wavelet) // 2 - 1 : -len(wavelet) // 2] + + return signal + + +def normalization_with_noise(signal, noise=0): + if noise > 0: + awgn = np.random.normal(0, noise * (np.max(signal) - np.min(signal)), len(signal)) + signal += awgn + + # min-max normalization + signal = signal - np.min(signal) + signal = signal / np.max(signal) + return signal + + +def generate_wavelet(sps, oversample=3): + sample_points = np.linspace( + -oversample * sps, + oversample * sps, + 2 * oversample * sps, + endpoint=True, + ) + sigma = 0.33 / (1 * np.sqrt(2 * np.log(2))) * sps + pulse = 1 / (sigma * np.sqrt(2 * np.pi)) * np.exp(-np.square(sample_points) / (2 * np.square(sigma))) + + return pulse + + +class eye_diagram: + def __init__(self, data, *, channel_names=None, horizontal_bins=256, vertical_bins=1000, n_levels=4): + # data has shape [channels, 2, samples] + # each sample has a timestamp and a value + if data.ndim == 2: + data = data[np.newaxis, :, :] + self.channel_names = channel_names + self.raw_data = data + self.channels = data.shape[0] + self.n_levels = n_levels + self.eye_stats = [{"success": False} for _ in range(self.channels)] + self.horizontal_bins = horizontal_bins + self.vertical_bins = vertical_bins + self.eye_built = False + self.analyse(self.n_levels) + + def generate_eye_data(self): + self.x_bins = np.linspace(0, 2, self.horizontal_bins, endpoint=False) + self.y_bins = np.zeros((self.channels, self.vertical_bins)) + self.eye_data = np.zeros((self.channels, self.vertical_bins, self.horizontal_bins)) + for i in range(self.channels): + data_min = np.min(self.raw_data[i, 1, :]) + data_max = np.max(self.raw_data[i, 1, :]) + self.y_bins[i] = np.linspace(data_min, data_max, self.vertical_bins, endpoint=False) + + t_vals = self.raw_data[i, 0, :] % 2 + val_vals = self.raw_data[i, 1, :] + + x_indices = np.digitize(t_vals, self.x_bins) - 1 + y_indices = np.digitize(val_vals, self.y_bins[i]) - 1 + + np.add.at(self.eye_data[i], (y_indices, x_indices), 1) + self.eye_built = True + + def plot(self, title="Eye Diagram", stats=True, show=True): + if not self.eye_built: + self.generate_eye_data() + cmap = LinearSegmentedColormap.from_list( + "eyemap", + [(0, "white"), (0.1, "blue"), (0.2, "cyan"), (0.5, "green"), (0.8, "yellow"), (0.9, "red"), (1, "magenta")], + ) + if self.channels % 2 == 0: + rows = 2 + cols = self.channels // 2 + else: + cols = int(np.ceil(np.sqrt(self.channels))) + rows = int(np.ceil(self.channels / cols)) + fig, ax = plt.subplots(rows, cols, sharex=True, sharey=False) + fig.suptitle(title) + ax = np.atleast_1d(ax).transpose().flatten() + for i in range(self.channels): + ax[i].set_title(self.channel_names[i] if self.channel_names is not None else f"Channel {i+1}") + ax[i].set_xlabel("Symbol") + ax[i].set_ylabel("Amplitude") + ax[i].grid() + ax[i].imshow( + self.eye_data[i], + origin="lower", + aspect="auto", + cmap=cmap, + extent=[self.x_bins[0], self.x_bins[-1], self.y_bins[i][0], self.y_bins[i][-1]], + ) + ax[i].set_xlim((self.x_bins[0], self.x_bins[-1])) + ymin = np.min(self.y_bins[:, 0]) + ymax = np.max(self.y_bins[:, -1]) + yspan = ymax - ymin + ax[i].set_ylim((ymin - 0.1 * yspan, ymax + 0.1 * yspan)) + if stats and self.eye_stats[i]["success"]: + ax[i].plot([0, 2], [self.eye_stats[i]["levels"], self.eye_stats[i]["levels"]], "k--") + ax[i].set_yticks(self.eye_stats[i]["levels"]) + # add arrows for amplitudes + for j in range(len(self.eye_stats[i]["amplitudes"])): + ax[i].annotate( + "", + xy=(0.05, self.eye_stats[i]["levels"][j]), + xytext=(0.05, self.eye_stats[i]["levels"][j + 1]), + arrowprops=dict(arrowstyle="<->", facecolor="black"), + ) + ax[i].annotate( + f"{self.eye_stats[i]['amplitudes'][j]:.2e}", + xy=(0.06, (self.eye_stats[i]["levels"][j] + self.eye_stats[i]["levels"][j + 1]) / 2), + ) + # add arrows for eye heights + for j in range(len(self.eye_stats[i]["heights"])): + try: + bot = np.max(self.eye_stats[i]["amplitude_clusters"][j]) + top = np.min(self.eye_stats[i]["amplitude_clusters"][j + 1]) + + ax[i].annotate( + "", + xy=(self.eye_stats[i]["time_midpoint"], bot), + xytext=(self.eye_stats[i]["time_midpoint"], top), + arrowprops=dict(arrowstyle="<->", facecolor="black"), + ) + ax[i].annotate( + f"{self.eye_stats[i]['heights'][j]:.2e}", + xy=(self.eye_stats[i]["time_midpoint"] + 0.015, (bot + top) / 2 + 0.04), + ) + except (ValueError, IndexError): + pass + # add arrows for eye widths + for j in range(len(self.eye_stats[i]["widths"])): + try: + left = np.max(self.eye_stats[i]["time_clusters"][j][0]) + right = np.min(self.eye_stats[i]["time_clusters"][j][1]) + vertical = (self.eye_stats[i]["levels"][j] + self.eye_stats[i]["levels"][j + 1]) / 2 + + ax[i].annotate( + "", + xy=(left, vertical), + xytext=(right, vertical), + arrowprops=dict(arrowstyle="<->", facecolor="black"), + ) + ax[i].annotate( + f"{self.eye_stats[i]['widths'][j]:.2e}", + xy=((left + right) / 2 - 0.15, vertical + 0.01), + ) + except (ValueError, IndexError): + pass + + # add area + for j in range(len(self.eye_stats[i]["areas"])): + horizontal = self.eye_stats[i]["time_midpoint"] + vertical = (self.eye_stats[i]["levels"][j] + self.eye_stats[i]["levels"][j + 1]) / 2 + ax[i].annotate( + f"{self.eye_stats[i]['areas'][j]:.2e}", + xy=(horizontal + 0.035, vertical - 0.07), + ) + + # add min_area above the plot + ax[i].annotate( + f"Min Area: {self.eye_stats[i]['min_area']:.2e}", + xy=(0.05, ymax + 0.05 * yspan), + # xycoords="axes fraction", + ha="left", + va="center", + ) + fig.tight_layout() + + if show: + plt.show() + return fig + + def analyse(self, n_levels=4): + warnings.filterwarnings("error") + for i in range(self.channels): + self.eye_stats[i]["channel"] = str(i+1) if self.channel_names is None else self.channel_names[i] + try: + approx_levels = eye_diagram.approximate_levels(self.raw_data[i], n_levels) + + time_bounds = eye_diagram.calculate_time_bounds(self.raw_data[i], approx_levels) + + self.eye_stats[i]["time_midpoint"] = (time_bounds[0] + time_bounds[1]) / 2 + + self.eye_stats[i]["levels"], self.eye_stats[i]["amplitude_clusters"] = eye_diagram.calculate_levels( + self.raw_data[i], approx_levels, time_bounds + ) + + self.eye_stats[i]["amplitudes"] = np.diff(self.eye_stats[i]["levels"]) + + self.eye_stats[i]["heights"] = eye_diagram.calculate_eye_heights( + self.eye_stats[i]["amplitude_clusters"] + ) + + self.eye_stats[i]["widths"], self.eye_stats[i]["time_clusters"] = eye_diagram.calculate_eye_widths( + self.raw_data[i], self.eye_stats[i]["levels"] + ) + + # # check if time clusters are valid (upper bound > time_midpoint > lower bound) + # # if not: raise ValueError + # for j in range(len(self.eye_stats[i]['time_clusters'])): + # if not (np.max(self.eye_stats[i]['time_clusters'][j][0]) < self.eye_stats[i]["time_midpoint"] < np.min(self.eye_stats[i]['time_clusters'][j][1])): + # raise ValueError + + self.eye_stats[i]["areas"] = self.eye_stats[i]["heights"] * self.eye_stats[i]["widths"] + self.eye_stats[i]["mean_area"] = np.mean(self.eye_stats[i]["areas"]) + self.eye_stats[i]["min_area"] = np.min(self.eye_stats[i]["areas"]) + + self.eye_stats[i]["success"] = True + except (RuntimeWarning, UserWarning, ValueError): + self.eye_stats[i]["success"] = False + self.eye_stats[i]["time_midpoint"] = 0 + self.eye_stats[i]["levels"] = np.zeros(n_levels) + self.eye_stats[i]["amplitude_clusters"] = [] + self.eye_stats[i]["amplitudes"] = np.zeros(n_levels - 1) + self.eye_stats[i]["heights"] = np.zeros(n_levels - 1) + self.eye_stats[i]["widths"] = np.zeros(n_levels - 1) + self.eye_stats[i]["areas"] = np.zeros(n_levels - 1) + self.eye_stats[i]["mean_area"] = 0 + self.eye_stats[i]["min_area"] = 0 + + warnings.resetwarnings() + + @staticmethod + def approximate_levels(data, levels): + amplitudes = data[1] + grouping_data = amplitudes.reshape(-1, 1) + + kmeans, clusters = eye_diagram.kmeans_cluster(grouping_data, levels) + + centroids = np.zeros(levels) + for i in range(levels): + centroids[i] = eye_diagram.shorth(clusters[i]) + + return np.sort(centroids) + + @staticmethod + def kmeans_cluster(data, levels): + working_data = data.reshape(-1, 1) + # initial = np.linspace(np.min(working_data), np.max(working_data), levels).reshape(-1, 1) + kmeans = kmeans2(working_data, levels, iter=100, minit="++") + + order = np.argsort(kmeans[0].squeeze()) + kmeans[0][:] = kmeans[0][order] + order = np.argsort(order) + kmeans[1][:] = order[kmeans[1]] + + clusters = [[] for _ in range(levels)] + for i, elem in enumerate(data): + clusters[kmeans[1][i]].append(elem.squeeze()) + clusters = [np.array(cluster) for cluster in clusters] + + # clusters = [clusters[i] for i in order] + + return kmeans, clusters + + @staticmethod + def shorth(data): + working_data = np.sort(data) + n = len(working_data) + h = n // 2 + 1 + min_diff = np.inf + interval = np.zeros(2) + for i in range(n - h): + diff = working_data[i + h] - working_data[i] + if diff < min_diff: + min_diff = diff + interval = [working_data[i], working_data[i + h]] + return np.mean(interval) + + @staticmethod + def calculate_time_bounds(data, level_centroids): + n_levels = 2 + + # prepare data + selection_range = eye_diagram.calc_selection_range(level_centroids[1:3], 0.01) + + # times = np.arange(0, len(data), dtype=np.float32) + times, amplitudes = data + grouping_data = times[(amplitudes > selection_range[0]) & (amplitudes < selection_range[1])] + grouping_data = grouping_data % 2 + grouping_data = grouping_data.reshape(-1, 1) + + kmeans, clusters = eye_diagram.kmeans_cluster(grouping_data, n_levels) + + # time_midpoint = (np.min(clusters[1]) + np.max(clusters[0]))/2 + + # # check if time clusters are valid (upper bound > time_midpoint > lower bound) + # # if not: raise ValueError + # if not (np.max(clusters[0]) < time_midpoint < np.min(clusters[1])): + # raise ValueError + + return np.min(clusters[1]), np.max(clusters[0]) + + @staticmethod + def calc_selection_range(data, tolerance): + middle = np.mean(data) + tol = tolerance * np.abs(np.diff(data)) + return (middle - tol, middle + tol) + + @staticmethod + def calculate_levels(data, level_centroids, time_bounds): + selection_range = eye_diagram.calc_selection_range(time_bounds, 0.025) + + times, amplitudes = data + indices = np.arange(0, len(times)) + filtered_time = indices[((times % 2) > selection_range[0]) & ((times % 2) < selection_range[1])] + filtered_data = amplitudes[filtered_time] + + vertical_bounds = np.array([ + -np.inf, + *[(level_centroids[i] + level_centroids[i + 1]) / 2 for i in range(len(level_centroids) - 1)], + np.inf, + ]) + + central_level_means = np.zeros(len(level_centroids)) + amplitude_clusters = [] + for i in range(len(level_centroids)): + amplitude_filtered_data = filtered_data[ + (filtered_data > vertical_bounds[i]) & (filtered_data < vertical_bounds[i + 1]) + ] + amplitude_clusters.append(amplitude_filtered_data) + central_level_means[i] = np.mean(amplitude_filtered_data) + + # # check if amplitude clusters are valid (upper bound > level_midpoint > lower bound) + # # if not: raise ValueError + # for j in range(len(amplitude_clusters)): + # level_midpoint = (central_level_means[j] + central_level_means[j+1]) / 2 + # if not (np.max(amplitude_clusters[0]) < level_midpoint < np.min(amplitude_clusters[1])): + # raise ValueError + + return central_level_means, amplitude_clusters + + @staticmethod + def calculate_eye_heights(amplitude_clusters): + eye_heights = np.zeros(len(amplitude_clusters) - 1) + for i in range(len(amplitude_clusters) - 1): + eye_heights[i] = np.min(amplitude_clusters[i + 1]) - np.max(amplitude_clusters[i]) + return eye_heights + + @staticmethod + def calculate_eye_widths(data, central_level_means): + n_levels = len(central_level_means) + + widths = np.zeros(n_levels - 1) + + times, amplitudes = data + clusters = [] + for i in range(n_levels - 1): + selection_range = eye_diagram.calc_selection_range( + [central_level_means[i], central_level_means[i + 1]], 0.01 + ) + grouping_data = times[(amplitudes > selection_range[0]) & (amplitudes < selection_range[1])] + grouping_data = grouping_data % 2 + grouping_data = grouping_data.reshape(-1, 1) + kmeans, cluster = eye_diagram.kmeans_cluster(grouping_data, 2) + clusters.append(cluster) + widths[i] = np.min(cluster[1]) - np.max(cluster[0]) + ... + return widths, clusters + + +if __name__ == "__main__": + length = int(2**14) + # data = generate_sample_data(length, noise=1) + # data1 = generate_sample_data(length, noise=0.01) + # data2 = generate_sample_data(length, noise=0.01, skew=1.2) + # data3 = generate_sample_data(length, noise=0.02) + + # data = np.stack([data, data1, data2, data3]) + + data = generate_sample_data(length, noise=0.005) + eye = eye_diagram(data, horizontal_bins=256, vertical_bins=256) + attrs = ("success", "amplitudes", "time_midpoint", "levels", "heights", "widths", "area", "mean_area", "min_area") + for i, channel in enumerate(eye.eye_stats): + print(f"Channel {i}") + print_data = {attr: channel[attr] for attr in attrs} + print(print_data) + + eye.plot()