From a96150510ad4cbe45e5dc5374065ad449396bec9 Mon Sep 17 00:00:00 2001 From: mbertuletti Date: Mon, 25 Nov 2024 18:26:07 +0100 Subject: [PATCH] [software] Merge libraries for data generation --- software/data/README.md | 24 +- software/data/gendata_header.py | 211 ++++++-------- software/data/gendatalib.py | 223 +++++++++++++++ software/data/gendatalib_blas.py | 127 --------- software/data/gendatalib_cfft.py | 104 ------- software/data/gendatalib_chest.py | 77 ------ software/data/qmath.py | 446 ++++++++++++++++++++++++++++++ 7 files changed, 777 insertions(+), 435 deletions(-) create mode 100644 software/data/gendatalib.py delete mode 100644 software/data/gendatalib_blas.py delete mode 100644 software/data/gendatalib_cfft.py delete mode 100755 software/data/gendatalib_chest.py create mode 100644 software/data/qmath.py diff --git a/software/data/README.md b/software/data/README.md index 3638c747c..9fdab87cf 100644 --- a/software/data/README.md +++ b/software/data/README.md @@ -1,11 +1,29 @@ # Data Generation Data for mempool applications is generated with the `gendata_header.py` script. -The `gendata_*.py` libaries contain the golden models for the applications under test. -The application parameters are passed to the script by means of the `gendata_params.hjson` file. +The `gendatalib.py` libaries generate random inputs and a reference golden model for the applications under test. +The application parameters are passed to the script with the `gendata_params.hjson` file. + +An example entry follows: `matmul_f32` is the name of MemPool application under test, the `type` refers to numpy precision, the `defines` are application parameters, turned into C constant declarations in the form `#define matrix_M (16)`, the `arrays` encode the C-type and name of input vectors for the application under test. + +` + "matmul_f32": { + "type": "float32", + "defines": [ + ("matrix_M", 16) + ("matrix_N", 16) + ("matrix_P", 16) + ] + "arrays": [ + ("float", "l2_A") + ("float", "l2_B") + ("float", "l2_C") + ] + } +` ## To test a new application: If a new application requires to be tested with data generated from a reference golden model: -- Add a new golden model to the existing libraries, or create a new one. +- Add a new golden model to the existing library `gendatalib.py`. - Add a golden model function call to the `gendata_header.py`. - Add a new item in the `gendata_params.hjson` to make function parameters configurable. diff --git a/software/data/gendata_header.py b/software/data/gendata_header.py index 49bd6396f..44749a4a0 100644 --- a/software/data/gendata_header.py +++ b/software/data/gendata_header.py @@ -9,14 +9,11 @@ import argparse import os -import math import hjson import ast import numpy -import gendatalib_cfft as cfft -import gendatalib_chest as chest -import gendatalib_blas as blas +import gendatalib as datalib header = """\ @@ -29,34 +26,58 @@ """ -def print_array(arr, typ, name): - +def format_type(typ, value): + """ + formats the type for printing in .h file. + :param typ: Input type + :param value: Input_value + """ typ_i32b = ["int32_t", "uint32_t"] typ_i16b = ["int16_t", "uint16_t"] typ_i8b = ["int8_t", "uint8_t"] + if typ in typ_i32b: + stringyfied_val = '({}) 0X{:08X}'.format(typ, value & 0xffffffff) + elif typ in typ_i16b: + stringyfied_val = '({}) 0X{:04X}'.format(typ, value & 0x0000ffff) + elif typ in typ_i8b: + stringyfied_val = '({}) 0X{:02X}'.format(typ, value & 0x000000ff) + elif typ == 'float': + stringyfied_val = '({}) {:+.8f}'.format(typ, value) + elif typ == '__fp16': + stringyfied_val = '({}) {:+.4f}'.format(typ, value) + else: + raise Exception("ERROR: Unsupported data type!!!") + + return stringyfied_val + + +def print_array(arr, typ, name): + """ + Converts arrays to a string. + + :param arr: Input array + :param typ: Type of the array. + :param name: Name of the array. + """ + output_string = typ attr = " __attribute__((aligned(sizeof(int32_t)), section(\".l2\"))) " - output_string += attr - output_string += name + '[{}] = {{\n'.format(arr.size) - for (value, count) in zip(arr, range(arr.size)): - if typ in typ_i32b: - output_string += '({}) 0X{:08X}, '.format(typ, value & 0xffffffff) - elif typ in typ_i16b: - output_string += '({}) 0X{:04X}, '.format(typ, value & 0x0000ffff) - elif typ in typ_i8b: - output_string += '({}) 0X{:02X}, '.format(typ, value & 0x000000ff) - elif typ == 'float': - output_string += '({}) {:+.8f}, '.format(typ, value) - elif typ == '__fp16': - output_string += '({}) {:+.4f}, '.format(typ, value) - else: - raise Exception("ERROR: Unsupported data type!!!") - count += 1 - if count % 4 == 0: - output_string += '\n' - output_string = output_string[:-3] - output_string += "};\n\n" + if (arr.size > 1): + output_string += attr + output_string += name + '[{}] = {{\n'.format(arr.size) + for (value, count) in zip(arr, range(arr.size)): + output_string += (format_type(typ, value) + ', ') + count += 1 + if count % 4 == 0: + output_string += '\n' + output_string = output_string[:-3] + output_string += "};\n\n" + else: + output_string += attr + output_string += (name + ' = ' + format_type(typ, arr)) + output_string += ";\n\n" + return output_string @@ -74,8 +95,8 @@ def print_file(header, defines, arrays, filename): output_string = header # Write the defines - for define_name, define_value in defines: - output_string += "#define {} ({})\n".format(define_name, define_value) + for def_key, def_value in defines.items(): + output_string += "#define {} ({})\n".format(def_key, def_value) output_string += "\n" # Add space between defines and arrays # Write the arrays using print_array @@ -90,6 +111,10 @@ def print_file(header, defines, arrays, filename): def get_type(type_string): + """ + Gets the numpy type from the type specifyied in the json + :param type_string: type from json file. + """ if type_string == "int8": return numpy.int8 elif type_string == "int16": @@ -120,107 +145,45 @@ def get_type(type_string): if data_args is not None: my_type = get_type(data_args.get("type")) - defnes = [ast.literal_eval(defne) - for defne in data_args.get("defines")] - arrays = [ast.literal_eval(array) - for array in data_args.get("arrays")] + defnes = dict([ast.literal_eval(defne) + for defne in data_args.get("defines")]) + arrays = [ast.literal_eval(array) for array in data_args.get("arrays")] # Determine output file name filename = os.path.dirname(os.path.abspath(__file__)) filename = os.path.join(filename, "data_{}.h".format(app_name)) - # Generate data header file - if app_name == "axpy_i32": - - result = blas.generate_iaxpy(**{name: value for name, value in defnes}) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "cfft_radix4_q16": - - result = cfft.generate_cfft_q16( - **{name: value for name, value in defnes}) - N = defnes[0][1] - defnes += [ - ("LOG2", int(math.log2(N))), - ("N_TWIDDLES", 3 * N // 4), - ("BITREVINDEXTABLE_LENGTH", len(result[3])), - ("TOLERANCE", result[4]), - ] - result = result[0:4] - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "cfft_radix2_q16": - - result = cfft.generate_cfft_q16( - **{name: value for name, value in defnes}) - N = defnes[0][1] - defnes += [ - ("LOG2", int(math.log2(N))), - ("N_TWIDDLES", 3 * N // 4), - ("BITREVINDEXTABLE_LENGTH", len(result[3])), - ("TOLERANCE", result[4]), - ] - result = result[0:4] - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "chest_q16": - - result = chest.generate_chest_q16( - **{name: value for name, value in defnes}) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "cholesky_q32": - - result = blas.generate_qcholesky( - **{name: value for name, value in defnes}) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "matmul_f16": - - result = blas.generate_fmatmul( - **{name: value for name, value in defnes}, my_type=my_type) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "matmul_f32": - - result = blas.generate_fmatmul( - **{name: value for name, value in defnes}, my_type=my_type) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "matmul_i32": - - result = blas.generate_imatmul( - **{name: value for name, value in defnes}, my_type=my_type) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "matmul_i16": - - result = blas.generate_imatmul( - **{name: value for name, value in defnes}, my_type=my_type) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif app_name == "matmul_i8": - - result = blas.generate_imatmul( - **{name: value for name, value in defnes}, my_type=my_type) - arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] - print_file(header, defnes, arrays, filename) - - elif (app_name == "fence") | (app_name == "memcpy"): - - result = blas.generate_iarray( - **{name: value for name, value in defnes}, my_type=my_type) - arrays = [(result, *arrays[0])] + # Define function mappings for each app_name + function_map = { + "axpy_i32": {"func": datalib.generate_iaxpy}, + "cfft_radix4_q16": {"func": datalib.generate_cfft_q16}, + "cfft_radix2_q16": {"func": datalib.generate_cfft_q16}, + "chest_q16": {"func": datalib.generate_qchest}, + "cholesky_q32": {"func": datalib.generate_qcholesky}, + "dotp_i32": {"func": datalib.generate_idotp}, + "matmul_f16": {"func": datalib.generate_fmatmul}, + "matmul_f32": {"func": datalib.generate_fmatmul}, + "matmul_i32": {"func": datalib.generate_imatmul}, + "matmul_i16": {"func": datalib.generate_imatmul}, + "matmul_i8": {"func": datalib.generate_imatmul}, + "fence": {"func": datalib.generate_iarray}, + "memcpy": {"func": datalib.generate_iarray}, + } + + # Check if app_name exists in the function map + if app_name in function_map: + func_info = function_map[app_name] + func = func_info["func"] + # Call the function + # The defnes dictionary is a function argument in case the generate + # function adds new definitions. + result, defnes = func(defines=defnes, my_type=my_type) + # Print result to data header + if len(arrays) == 1: + arrays = [(result, *arrays[0])] + else: + arrays = [(result[i], *arrays[i]) for i in range(len(arrays))] print_file(header, defnes, arrays, filename) else: - print("No need for data generation.") + print("Data generation is not defined.") diff --git a/software/data/gendatalib.py b/software/data/gendatalib.py new file mode 100644 index 000000000..12c6ee11f --- /dev/null +++ b/software/data/gendatalib.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 + +# Copyright 2022 ETH Zurich and University of Bologna. +# Solderpad Hardware License, Version 0.51, see LICENSE for details. +# SPDX-License-Identifier: SHL-0.51 + +# This script generates data for the fp16 matmul. +# Author: Marco Bertuletti + +# The script generates random inputs for the C functions. The inputs are +# propagated though a python golden model. Golden models are from the +# numpy library or the qmath bit-true library. + +import numpy as np +import math +import qmath +from scipy import signal + + +def select_maxval(my_type=np.int32): + size = 8 * np.dtype(my_type).itemsize + MAX = 2**(size - 2) - 1 + return MAX + + +def irandom(size, MAX, my_type=np.int16): + """Generate random numbers. + size (int or tuple): Size of the array to generate. + mytype (np.dtype): Data type for the fixed-point representation. + Defaults to np.int16. + + Returns: + np.ndarray: Array of random fixed-point numbers. + """ + return np.random.randint(-MAX, MAX - 1, size=size, dtype=my_type) + + +def icrandom(size, MAX, my_type=np.int16): + """Generate random complex numbers. + size (int or tuple): Size of the array to generate. + mytype (np.dtype): Data type for the fixed-point representation. + Defaults to np.int16. + + Returns: + np.ndarray: Array of random complex fixed-point numbers. + """ + real_part = np.random.randint(-MAX, MAX - 1, size=size, dtype=my_type) + imag_part = np.random.randint(-MAX, MAX - 1, size=size, dtype=my_type) + return real_part + 1j * imag_part + + +def generate_iarray(my_type=np.float32, defines={}): + + # Create random array of integers + array_N = defines['array_N'] + MAX = select_maxval(my_type) + A = irandom(MAX=MAX, size=(array_N), my_type=my_type) + return [A], defines + + +def generate_fmatmul(my_type=np.float32, defines={}): + + # Create matrix + matrix_M = defines['matrix_M'] + matrix_N = defines['matrix_N'] + matrix_P = defines['matrix_P'] + A = (np.random.rand(matrix_M, matrix_N) - 0.5).astype(my_type) + B = (np.random.rand(matrix_N, matrix_P) - 0.5).astype(my_type) + C = np.matmul(A, B) + + A = np.reshape(A, (matrix_M * matrix_N), order='C').astype(my_type) + B = np.reshape(B, (matrix_N * matrix_P), order='C').astype(my_type) + C = np.reshape(C, (matrix_M * matrix_P), order='C').astype(my_type) + + return [A, B, C], defines + + +def generate_imatmul(my_type=np.int32, defines={}): + + # Create matrix + matrix_M = defines['matrix_M'] + matrix_N = defines['matrix_N'] + matrix_P = defines['matrix_P'] + MAX = select_maxval(my_type) + A = irandom(MAX=MAX, size=(matrix_M, matrix_N), my_type=my_type) + B = irandom(MAX=MAX, size=(matrix_M, matrix_N), my_type=my_type) + C = np.matmul(A, B) + + A = np.reshape(A, (matrix_M * matrix_N), order='C').astype(my_type) + B = np.reshape(B, (matrix_N * matrix_P), order='C').astype(my_type) + C = np.reshape(C, (matrix_M * matrix_P), order='C').astype(np.int32) + + return [A, B, C], defines + + +def generate_iaxpy(my_type=np.int32, defines={}): + + # Create matrix + ALPHA = defines['ALPHA'] + array_N = defines['array_N'] + MAX = select_maxval(my_type) + X = irandom(MAX=MAX, size=(array_N), my_type=my_type) + Y = irandom(MAX=MAX, size=(array_N), my_type=my_type) + Z = (Y + X * ALPHA).astype(my_type) + + return [X, Y, Z], defines + + +def generate_idotp(my_type=np.int32, defines={}): + + # Create matrix + array_N = defines['array_N'] + MAX = select_maxval(my_type) + X = irandom(MAX=MAX, size=(array_N), my_type=my_type) + Y = irandom(MAX=MAX, size=(array_N), my_type=my_type) + Z = np.array((np.dot(X, Y))).astype(my_type) + + return [X, Y, Z], defines + + +def generate_iconv(my_type=np.int32, defines={}): + + # Create matrix + matrix_M = defines['matrix_M'] + matrix_N = defines['matrix_N'] + kernel_N = defines['kernel_N'] + MAX = select_maxval(my_type) + X = irandom(MAX=MAX, size=(matrix_M, matrix_N), my_type=my_type) + K = irandom(MAX=MAX, size=(kernel_N, kernel_N), my_type=my_type) + Y = signal.convolve2d(X, K, mode="same", boundary='fill') + + X = X.flatten().astype(my_type) + K = K.flatten().astype(my_type) + Y = Y.flatten().astype(my_type) + + return [X, K, Y], defines + + +def generate_qchest(defines={}, fixed_point=15, my_type=np.int16): + + N_TX = defines['N_TX'] + N_RX = defines['N_RX'] + N_SAMPLES = defines['N_SAMPLES'] + + qvector_pilot_tx = [] + qvector_pilot_rx = [] + qvector_Hest = [] + for k in range(N_SAMPLES): + # Create pilots + pilot_rx = icrandom(size=N_RX, MAX=2**7, my_type=np.int32) + pilot_tx = icrandom(size=N_TX, MAX=2**7, my_type=np.int32) + # Compute Hest + Hest = qmath.qchest(pilot_rx, pilot_tx, fixed_point=8) + + pilot_tx = np.column_stack((pilot_tx.imag, pilot_tx.real)) + pilot_rx = np.column_stack((pilot_rx.imag, pilot_rx.real)) + qvector_pilot_tx.append(pilot_tx.astype(np.int16).flatten()) + qvector_pilot_rx.append(pilot_rx.astype(np.int16).flatten()) + qvector_Hest.append(Hest) + + qvector_pilot_tx = np.reshape(qvector_pilot_tx, [2 * N_TX * N_SAMPLES]) + qvector_pilot_rx = np.reshape(qvector_pilot_rx, [2 * N_RX * N_SAMPLES]) + qvector_Hest = np.reshape(qvector_Hest, [2 * N_TX * N_RX * N_SAMPLES]) + return [qvector_pilot_tx, qvector_pilot_rx, qvector_Hest], defines + + +def generate_qcholesky(defines={}, fixed_point=15, my_type=np.int32): + + matrix_N = defines['matrix_N'] + FIXED_POINT = defines['FIXED_POINT'] + + A = irandom(size=(matrix_N, matrix_N), MAX=2**14, my_type=my_type) + y = irandom(size=matrix_N, MAX=2**14, my_type=my_type) + A = qmath.qmatmul(A.T, A, FIXED_POINT, my_type) + L = qmath.qcholesky(A, fixed_point=FIXED_POINT, mytype=my_type) + + A = np.reshape(A, (matrix_N * matrix_N), order='C').astype(my_type) + L = np.reshape(L, (matrix_N * matrix_N), order='C').astype(my_type) + return [A, L, y], defines + + +def generate_cfft_q16(defines={}, fixed_point=15, my_type=np.int16): + + N_CSAMPLES = defines['N_CSAMPLES'] + src = icrandom(size=N_CSAMPLES, MAX=2**fixed_point, my_type=my_type) + tolerance = { + 16: 16, + 32: 20, + 64: 24, + 128: 28, + 256: 32, + 512: 48, + 1024: 64, + 2048: 96, + 4096: 128} + bit_shift_dict_q16 = { + 16: 11, + 32: 10, + 64: 9, + 128: 8, + 256: 7, + 512: 6, + 1024: 5, + 2048: 4, + 4096: 3} + + dst = np.fft.fft(src.astype(np.csingle) / (2**fixed_point)) + dst = dst * 2**(bit_shift_dict_q16[N_CSAMPLES]) + + dst = (np.column_stack((dst.real, dst.imag))).flatten() + src = (np.column_stack((src.real, src.imag))).flatten() + dst = dst.astype(np.int16) + src = src.astype(np.int16) + + twiddles = qmath.qtwiddleCoef(N_CSAMPLES) + bitrever = qmath.bitreversal(N_CSAMPLES, 2) + + defines['LOG2'] = int(math.log2(N_CSAMPLES)) + defines['N_TWIDDLES'] = 3 * N_CSAMPLES // 4 + defines['BITREVINDEXTABLE_LENGTH'] = len(bitrever) + defines['TOLERANCE'] = tolerance[N_CSAMPLES] + + return [src, dst, twiddles, bitrever], defines diff --git a/software/data/gendatalib_blas.py b/software/data/gendatalib_blas.py deleted file mode 100644 index 2da05568c..000000000 --- a/software/data/gendatalib_blas.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2022 ETH Zurich and University of Bologna. -# Solderpad Hardware License, Version 0.51, see LICENSE for details. -# SPDX-License-Identifier: SHL-0.51 - -# This script generates data for the fp16 matmul. -# Author: Marco Bertuletti - -import numpy as np -from scipy import signal - - -def select_maxval(my_type=np.int32): - size = 8 * np.dtype(my_type).itemsize - MAX = 2**(size - 2) - 1 - return MAX - - -def generate_iarray(array_N=16, my_type=np.float32): - - # Create random array of integers - MAX = select_maxval(my_type) - A = np.random.randint(-MAX, MAX - 1, size=(array_N)).astype(my_type) - return A - - -def generate_fmatmul(matrix_M=16, matrix_N=16, - matrix_P=16, my_type=np.float32): - - # Create matrix - A = (np.random.rand(matrix_M, matrix_N) - 0.5).astype(my_type) - B = (np.random.rand(matrix_N, matrix_P) - 0.5).astype(my_type) - C = np.matmul(A, B) - - A = np.reshape(A, (matrix_M * matrix_N), order='C').astype(my_type) - B = np.reshape(B, (matrix_N * matrix_P), order='C').astype(my_type) - C = np.reshape(C, (matrix_M * matrix_P), order='C').astype(my_type) - - return A, B, C - - -def generate_imatmul(matrix_M=16, matrix_N=16, matrix_P=16, my_type=np.int32): - - # Create matrix - MAX = select_maxval(my_type) - A = np.random.randint(-MAX, MAX - 1, size=(matrix_M, matrix_N)) - B = np.random.randint(-MAX, MAX - 1, size=(matrix_M, matrix_N)) - C = np.matmul(A, B) - - A = np.reshape(A, (matrix_M * matrix_N), order='C').astype(my_type) - B = np.reshape(B, (matrix_N * matrix_P), order='C').astype(my_type) - C = np.reshape(C, (matrix_M * matrix_P), order='C').astype(np.int32) - - return A, B, C - - -def qmatmul(A, B, matrix_M=16, matrix_N=16, matrix_P=16, - FIXED_POINT=10, my_type=np.int32): - - # fixed-point mul is rounded up - half = 2**FIXED_POINT - 1 - C = np.zeros((matrix_M, matrix_P), dtype=my_type) - for i in range(matrix_M): - for j in range(matrix_N): - for k in range(matrix_P): - C[i][k] += (A[i][j] * B[j][k] + half) / 2**FIXED_POINT - return C - - -def generate_qmatmul(matrix_M=16, matrix_N=16, matrix_P=16, - FIXED_POINT=10, my_type=np.int32): - - # Create matrix - MAX = select_maxval(my_type) - A = np.random.randint(-MAX, MAX - 1, size=(matrix_M, matrix_N)) - B = np.random.randint(-MAX, MAX - 1, size=(matrix_M, matrix_N)) - C = qmatmul(A, B, matrix_M, matrix_N, matrix_P, FIXED_POINT, my_type) - # Cast outputs - A = np.reshape(A, (matrix_M * matrix_N), order='C').astype(my_type) - B = np.reshape(B, (matrix_N * matrix_P), order='C').astype(my_type) - C = np.reshape(C, (matrix_M * matrix_P), order='C').astype(my_type) - - return A, B, C - - -def generate_qcholesky(matrix_N=16, FIXED_POINT=10, my_type=np.int32): - - # Create matrix - MAX = select_maxval(my_type) - A = np.random.randint(-MAX, MAX - 1, size=(matrix_N, matrix_N)) - y = np.random.randint(-MAX, MAX - 1, size=matrix_N) - A = qmatmul(A.T, A, matrix_N, matrix_N, matrix_N, FIXED_POINT, my_type) - L = np.zeros((matrix_N, matrix_N), dtype=my_type) - - # TO_DO: Compute Cholesky Golden model - # TO_DO: Compute Triangular system solver Golden model - - A = np.reshape(A, (matrix_N * matrix_N), order='C').astype(my_type) - L = np.reshape(L, (matrix_N * matrix_N), order='C').astype(my_type) - return A, L, y - - -def generate_iaxpy(ALPHA=6, array_N=1024, my_type=np.int32): - - # Create matrix - MAX = select_maxval(my_type) - X = np.random.randint(-MAX, MAX, size=(array_N)).astype(my_type) - Y = np.random.randint(-MAX, MAX, size=(array_N)).astype(my_type) - Z = (Y + X * ALPHA).astype(my_type) - - return X, Y, Z - - -def generate_iconv(matrix_M=32, matrix_N=32, kernel_N=3, my_type=np.int32): - - # Create matrix - MAX = select_maxval(my_type) - X = np.random.randint(-MAX, MAX, size=(matrix_M, matrix_N)).astype(my_type) - K = np.random.randint(-MAX, MAX, size=(kernel_N, kernel_N)).astype(my_type) - Y = signal.convolve2d(X, K, mode="same", boundary='fill') - - X = X.flatten().astype(my_type) - K = K.flatten().astype(my_type) - Y = Y.flatten().astype(my_type) - - return X, K, Y diff --git a/software/data/gendatalib_cfft.py b/software/data/gendatalib_cfft.py deleted file mode 100644 index 6916a532d..000000000 --- a/software/data/gendatalib_cfft.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2022 ETH Zurich and University of Bologna. -# Solderpad Hardware License, Version 0.51, see LICENSE for details. -# SPDX-License-Identifier: SHL-0.51 - -# This script generates data for the cfft kernel. -# Author: Marco Bertuletti - -import numpy as np -import math as M -from sympy.combinatorics import Permutation - - -def generate_twiddleCoefq15(N): - PI = 3.14159265358979 - twiddleCoefq15 = np.zeros((int)(2 * 3 * N / 4), np.int16) - for i in range(0, (int)(3 * N / 4)): - twiddleCoefq15_cos = M.cos(i * 2 * PI / N) - twiddleCoefq15_sin = M.sin(i * 2 * PI / N) - twiddleCoefq15[2 * i] = int(round(twiddleCoefq15_cos * (2**15 - 1))) - twiddleCoefq15[2 * i + - 1] = int(round(twiddleCoefq15_sin * (2**15 - 1))) - return twiddleCoefq15 - - -def generate_bitreversal(N, R): - # Decompose - logR2 = [] - idx = N - while (idx >= R): - logR2.append(int(M.log2(R))) - idx = idx // R - if (idx > 1): - logR2.append(int(M.log2(idx))) - # Bitreversal - indexes = [] - for x in range(N): - result = 0 - for bits in logR2: - mask = (0xffffffff >> (32 - bits)) - result = (result << bits) | (x & mask) - x = x >> bits - indexes.append(result) - # Create transpositions table - tps = [] - for c in Permutation.from_sequence(indexes).cyclic_form: - for i in range(len(c) - 1): - tps.append([c[i] * 8, c[-1] * 8]) - return np.ndarray.flatten(np.array(tps)) - - -def generate_cfft_q16(N_CSAMPLES): - # Q16: - # len=16: Q1.15 -> Q5.11 - # len=32: Q1.15 -> Q6.10 - # len=64: Q1.15 -> Q7.9 - # len=128: Q1.15 -> Q8.8 - # len=256: Q1.15 -> Q9.7 - # len=512: Q1.15 -> Q10.6 - # len=1024: Q1.15 -> Q11.5 - # len=2048: Q1.15 -> Q12.4 - # len=4096: Q1.15 -> Q13.3 - MAX = 2**(15) - src = (np.random.randint(-MAX, MAX - 1, 2 * - N_CSAMPLES, dtype=np.int16)).astype(np.int16) - tolerance = { - 16: 16, - 32: 20, - 64: 24, - 128: 28, - 256: 32, - 512: 48, - 1024: 64, - 2048: 96, - 4096: 128} - bit_shift_dict_q16 = { - 16: 11, - 32: 10, - 64: 9, - 128: 8, - 256: 7, - 512: 6, - 1024: 5, - 2048: 4, - 4096: 3} - my_fixpoint = 15 - dst = np.zeros(2 * N_CSAMPLES, dtype=np.int16) - complex_src = np.zeros(N_CSAMPLES, dtype=np.csingle) - complex_dst = np.zeros(N_CSAMPLES, dtype=np.csingle) - for i in range(N_CSAMPLES): - shift = 2**(my_fixpoint) - complex_src[i] = (src[2 * i].astype(np.csingle) / shift) + \ - 1j * (src[2 * i + 1].astype(np.csingle) / shift) - complex_dst = np.fft.fft(complex_src) - for i in range(N_CSAMPLES): - shift = 2**(bit_shift_dict_q16[N_CSAMPLES]) - dst[2 * i] = (np.real(complex_dst[i]) * shift).astype(np.int16) - dst[2 * i + 1] = (np.imag(complex_dst[i]) * shift).astype(np.int16) - - twiddles = generate_twiddleCoefq15(N_CSAMPLES) - bitrever = generate_bitreversal(N_CSAMPLES, 2) - - return src, dst, twiddles, bitrever, tolerance[N_CSAMPLES] diff --git a/software/data/gendatalib_chest.py b/software/data/gendatalib_chest.py deleted file mode 100755 index ae197723b..000000000 --- a/software/data/gendatalib_chest.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2022 ETH Zurich and University of Bologna. -# Solderpad Hardware License, Version 0.51, see LICENSE for details. -# SPDX-License-Identifier: SHL-0.51 - -# This script generates data for the Channel estimation. -# Author: Marco Bertuletti - -import numpy as np - - -def q_sat(x): - if x > 2**15 - 1: - return x - 2**16 - elif x < -2**15: - return x + 2**16 - else: - return x - - -def compute_chest_q16(in_rx, in_tx, p): - n_rx = in_rx.size - n_tx = in_tx.size - result = np.zeros(2 * (n_tx * n_rx), dtype=np.int16) - for i in range(n_rx): - a_r = in_rx[i].real - a_i = in_rx[i].imag - for j in range(n_tx): - b_r = in_tx[j].real - b_i = in_tx[j].imag - -# # Compute data division -# den = (2**16) // (b_r * b_r + b_i * b_i) -# num_r = (a_r * b_r) + (a_i * b_i) -# num_i = (a_i * b_r) - (a_r * b_i) -# result[2 * (i * n_tx + j)] = q_sat((num_r * den) // 2**p) -# result[2 * (i * n_tx + j) + 1] = q_sat((num_i * den) // 2**p) - - # Compute data multiplication - num_r = (a_r * b_r) - (a_i * b_i) - num_i = (a_i * b_r) + (a_r * b_i) - result[2 * (i * n_tx + j)] = q_sat(num_r // 2**p) - result[2 * (i * n_tx + j) + 1] = q_sat(num_i // 2**p) - return result - - -def generate_chest_q16(N_TX, N_RX, N_SAMPLES): - FIXED_POINT = 8 - MAX = 2**7 - - qvector_pilot_tx = [] - qvector_pilot_rx = [] - qvector_Hest = [] - for k in range(N_SAMPLES): - # Create pilots - pilot_rx = np.random.randint(-MAX, MAX - 1, size=N_RX) + 1j * \ - np.random.randint(-MAX, MAX - 1, size=N_RX) - pilot_tx = np.random.randint(-MAX, MAX - 1, size=N_TX) + 1j * \ - np.random.randint(-MAX, MAX - 1, size=N_TX) - # Compute Hest - Hest = compute_chest_q16(pilot_rx, pilot_tx, FIXED_POINT) - - pilot_tx = np.column_stack( - (pilot_tx.imag, pilot_tx.real)).astype( - np.int16).flatten() - pilot_rx = np.column_stack( - (pilot_rx.imag, pilot_rx.real)).astype( - np.int16).flatten() - qvector_pilot_tx.append(pilot_tx) - qvector_pilot_rx.append(pilot_rx) - qvector_Hest.append(Hest) - - qvector_pilot_tx = np.reshape(qvector_pilot_tx, [2 * N_TX * N_SAMPLES]) - qvector_pilot_rx = np.reshape(qvector_pilot_rx, [2 * N_RX * N_SAMPLES]) - qvector_Hest = np.reshape(qvector_Hest, [2 * N_TX * N_RX * N_SAMPLES]) - return qvector_pilot_tx, qvector_pilot_rx, qvector_Hest diff --git a/software/data/qmath.py b/software/data/qmath.py new file mode 100644 index 000000000..165354691 --- /dev/null +++ b/software/data/qmath.py @@ -0,0 +1,446 @@ +#!/usr/bin/env python3 + +# Copyright 2022 ETH Zurich and University of Bologna. +# Solderpad Hardware License, Version 0.51, see LICENSE for details. +# SPDX-License-Identifier: SHL-0.51 + +# This script generates data for the fp16 mmse. +# Author: Marco Bertuletti + +import numpy as np +import math +from sympy.combinatorics import Permutation + + +def to_fixed_point(matrix, fixed_point=15, mytype=np.int16): + """Convert a complex matrix to a fixed-point matrix. + matrix (np.ndarray): Input complex matrix. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + tuple: Real and imaginary parts of the fixed-point matrix. + """ + SCALE_FACTOR = 2**fixed_point + real_part = np.round(matrix.real * SCALE_FACTOR).astype(mytype) + imag_part = np.round(matrix.imag * SCALE_FACTOR).astype(mytype) + if (np.abs(real_part.any()) > 2**(fixed_point - 1)): + raise ValueError("Overflow") + if (np.abs(imag_part.any()) > 2**(fixed_point - 1)): + raise ValueError("Overflow") + return real_part, imag_part + + +def from_fixed_point(real_part, imag_part, fixed_point=15, mytype=np.int16): + """Convert a fixed-point matrix back to a floating-point complex matrix. + real_part (np.ndarray): Real part of the fixed-point matrix. + imag_part (np.ndarray): Imaginary part of the fixed-point matrix. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + np.ndarray: Reconstructed complex matrix. + """ + SCALE_FACTOR = 2**fixed_point + return (real_part / SCALE_FACTOR) + 1j * (imag_part / SCALE_FACTOR) + + +def qmatmul(A, B, fixed_point=15, mytype=np.int16): + """Perform fixed-point matrix multiplication. + A (np.ndarray): First matrix. + B (np.ndarray): Second matrix. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + np.ndarray: Fixed-point result of the matrix multiplication. + """ + SCALE_FACTOR = 2**fixed_point + rows_A, cols_A = A.shape + cols_B = B.shape[1] + C = np.zeros((rows_A, cols_B), dtype=mytype) + + for i in range(rows_A): + for j in range(cols_B): + for k in range(cols_A): + C[i, j] += A[i, k] * B[k, j] // SCALE_FACTOR + return C + + +def qcmatmul(A_real, A_imag, B_real, B_imag, fixed_point=15, mytype=np.int16): + """Perform fixed-point complex matrix multiplication. + A_real (np.ndarray): Real part of the first matrix. + A_imag (np.ndarray): Imaginary part of the first matrix. + B_real (np.ndarray): Real part of the second matrix. + B_imag (np.ndarray): Imaginary part of the second matrix. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + tuple: Real and imaginary parts of the result matrix. + """ + SCALE_FACTOR = 2**fixed_point + rows_A, cols_A = A_real.shape + cols_B = B_real.shape[1] + + C_real = np.zeros((rows_A, cols_B), dtype=mytype) + C_imag = np.zeros((rows_A, cols_B), dtype=mytype) + + for i in range(rows_A): + for j in range(cols_B): + for k in range(cols_A): + real_product = A_real[i, k] * \ + B_real[k, j] - A_imag[i, k] * B_imag[k, j] + imag_product = A_real[i, k] * \ + B_imag[k, j] + A_imag[i, k] * B_real[k, j] + + C_real[i, j] += real_product // SCALE_FACTOR + C_imag[i, j] += imag_product // SCALE_FACTOR + + return C_real, C_imag + + +def qcmvmul(A_real, A_imag, B_real, B_imag, fixed_point=15, mytype=np.int16): + """Perform fixed-point complex matrix-vector multiplication. + A_real (np.ndarray): Real part of the matrix. + A_imag (np.ndarray): Imaginary part of the matrix. + B_real (np.ndarray): Real part of the vector. + B_imag (np.ndarray): Imaginary part of the vector. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + tuple: Real and imaginary parts of the result vector. + """ + SCALE_FACTOR = 2**fixed_point + rows_A, cols_A = A_real.shape + + C_real = np.zeros(rows_A, dtype=mytype) + C_imag = np.zeros(rows_A, dtype=mytype) + + for i in range(rows_A): + for k in range(cols_A): + real_product = A_real[i, k] * B_real[k] - A_imag[i, k] * B_imag[k] + imag_product = A_real[i, k] * B_imag[k] + A_imag[i, k] * B_real[k] + + C_real[i] += real_product // SCALE_FACTOR + C_imag[i] += imag_product // SCALE_FACTOR + + return C_real, C_imag + + +def qsqrt(n, fixed_point=15, mytype=np.int16): + """Compute the square root of a number in fixed-point representation using + Newton-Raphson method. + n (np.ndarray): Input value(s) in fixed-point representation. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + np.ndarray: Square root of the input in fixed-point representation. + """ + SCALE_FACTOR = 2**fixed_point + x = np.ones_like(n, dtype=mytype) * SCALE_FACTOR + n_one = n * SCALE_FACTOR + + itr = 0 + while True: + x_old = x + x = (x + n_one // x) // 2 + if np.array_equal( + x, x_old) or itr == 10: # Convergence or max iterations + break + itr += 1 + return x + + +def qcholesky(A, fixed_point=15, mytype=np.int16): + """Perform fixed-point Cholesky decomposition of a symmetric + positive-definite matrix. + A (np.ndarray): Input matrix (must be square and symmetric). + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + tuple: Flattened input matrix, flattened lower triangular matrix, and + result vector. + """ + SCALE_FACTOR = 2**fixed_point + rows, columns = A.shape + if rows != columns: + raise ValueError("Matrix must be square for Cholesky decomposition.") + + L = np.zeros((rows, columns), dtype=mytype) + + for row in range(rows): + for column in range(columns): + if row == column: + pivot = A[row, column] + for k in range(column): + Ljk = L[row, k] + pivot -= (Ljk**2) // SCALE_FACTOR + if pivot < 0: + # raise ValueError("Negative value encountered in diagonal + # element.") + pivot = 0 + L[row, column] = qsqrt(pivot, fixed_point, mytype) + elif row > column: + pivot = A[row, column] + for k in range(column): + Lik = L[row, k] + Ljk = L[column, k] + pivot -= (Lik * Ljk) // SCALE_FACTOR + diag = L[column, column] + L[row, column] = (pivot * SCALE_FACTOR) // diag + else: + L[row, column] = 0 + + return L + + +def qccholesky(M_real, M_imag, fixed_point=15, mytype=np.int16): + """Perform fixed-point Cholesky decomposition of a symmetric + positive-definite matrix. + A (np.ndarray): Input matrix (must be square and symmetric). + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + tuple: Flattened input matrix, flattened lower triangular matrix, + and result vector. + """ + + SCALE_FACTOR = 2**fixed_point + NEGATIVE = fixed_point**2 + 1 + + rows, columns = M_real.shape + L_real = np.zeros_like(M_real, dtype=mytype) # Initialize dest with zeros + L_imag = np.zeros_like(M_imag, dtype=mytype) # Initialize dest with zeros + + # Check for dimensional errors + if rows != columns: + raise ValueError("Matrix must be square for Cholesky decomposition.") + + for row in range(rows): + for column in range(columns): + + if row == column: + # Diagonal element + real_pivot = M_real[row, column] + for k in range(column): + real_Ljk = L_real[row, k] + imag_Ljk = L_imag[row, k] + product = (real_Ljk**2 + imag_Ljk**2) // SCALE_FACTOR + real_pivot = real_pivot - product + + # Handle negative values for square root + if real_pivot < 0: + if real_pivot < NEGATIVE: + raise ValueError("Negative value encountered.") + real_pivot = 0 + L_real[row, column] = qsqrt(real_pivot, fixed_point, mytype) + + elif row > column: + # Off-diagonal element (below the diagonal) + real_pivot = M_real[row, column] + imag_pivot = M_imag[row, column] + + for k in range(column): + real_Lik = L_real[row, k] + imag_Lik = L_imag[row, k] + real_Ljk = L_real[column, k] + imag_Ljk = L_imag[column, k] + real_product = ( + real_Lik * real_Ljk - imag_Lik * imag_Ljk) // SCALE_FACTOR + imag_product = ( + real_Lik * imag_Ljk + imag_Lik * real_Ljk) // SCALE_FACTOR + real_pivot = real_pivot - real_product + imag_pivot = imag_pivot - imag_product + + diag = L_real[column, column] + L_real[row, column] = (real_pivot * SCALE_FACTOR) // diag + L_imag[row, column] = (imag_pivot * SCALE_FACTOR) // diag + + else: + # Above diagonal, set to zero + L_real[row, column] = 0 + L_imag[row, column] = 0 + + return L_real, L_imag + + +def qinvertLt(M_real, M_imag, y_real, y_imag, fixed_point=15, mytype=np.int16): + """Invert a lower triangular complex matrix using fixed-point arithmetic. + M_real (np.ndarray): Real part of the lower triangular matrix. + M_imag (np.ndarray): Imaginary part of the lower triangular matrix. + y_real (np.ndarray): Real part of the vector. + y_imag (np.ndarray): Imaginary part of the vector. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + tuple: Real and imaginary parts of the result vector. + """ + SCALE_FACTOR = 2**fixed_point + n = M_real.shape[0] + x_real = np.zeros_like(y_real, dtype=mytype) + x_imag = np.zeros_like(y_imag, dtype=mytype) + + for i in range(n): + sum_real = y_real[i] + sum_imag = y_imag[i] + for j in range(i): + sum_real -= (M_real[i, j] * x_real[j] - + M_imag[i, j] * x_imag[j]) // SCALE_FACTOR + sum_imag -= (M_real[i, j] * x_imag[j] + + M_imag[i, j] * x_real[j]) // SCALE_FACTOR + + x_real[i] = (sum_real * SCALE_FACTOR) // M_real[i, i] + x_imag[i] = (sum_imag * SCALE_FACTOR) // M_real[i, i] + + return x_real, x_imag + + +def qinvertUt(M_real, M_imag, y_real, y_imag, fixed_point=15, mytype=np.int16): + """Invert an upper triangular complex matrix using fixed-point arithmetic. + M_real (np.ndarray): Real part of the upper triangular matrix. + M_imag (np.ndarray): Imaginary part of the upper triangular matrix. + y_real (np.ndarray): Real part of the vector. + y_imag (np.ndarray): Imaginary part of the vector. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + tuple: Real and imaginary parts of the result vector. + """ + SCALE_FACTOR = 2**fixed_point + n = M_real.shape[0] + x_real = np.zeros_like(y_real, dtype=mytype) + x_imag = np.zeros_like(y_imag, dtype=mytype) + + for i in range(n - 1, -1, -1): + sum_real = y_real[i] + sum_imag = y_imag[i] + + for j in range(i + 1, n): + sum_real -= (M_real[i, j] * x_real[j] - + M_imag[i, j] * x_imag[j]) // SCALE_FACTOR + sum_imag -= (M_real[i, j] * x_imag[j] + + M_imag[i, j] * x_real[j]) // SCALE_FACTOR + + x_real[i] = (sum_real * SCALE_FACTOR) // M_real[i, i] + x_imag[i] = (sum_imag * SCALE_FACTOR) // M_real[i, i] + + return x_real, x_imag + + +def qtwiddleCoef(N, fixed_point=15, mytype=np.int16): + """Generate fixed-point twiddle coefficients for FFT. + N (int): Number of points in FFT. + fixed_point (int): Number of bits for the fractional part. + mytype (np.dtype): Data type for the fixed-point representation. + + Returns: + np.ndarray: Twiddle coefficients in fixed-point representation. + """ + PI = 3.14159265358979 + twiddleCoefq15 = np.zeros((int(2 * 3 * N / 4)), dtype=mytype) + for i in range(int(3 * N / 4)): + twiddleCoefq15_cos = math.cos(i * 2 * PI / N) + twiddleCoefq15_sin = math.sin(i * 2 * PI / N) + twiddleCoefq15[2 * i] = + int(round(twiddleCoefq15_cos * (2**fixed_point - 1))) + twiddleCoefq15[2 * i + 1] = + int(round(twiddleCoefq15_sin * (2**fixed_point - 1))) + return twiddleCoefq15 + + +def bitreversal(N, R): + """Perform bit-reversal for FFT with radix-R decomposition. + + Args: + N (int): Number of points in FFT. + R (int): Radix for FFT decomposition. + + Returns: + np.ndarray: Flattened bit-reversal transposition table. + """ + # Decompose + logR2 = [] + idx = N + while (idx >= R): + logR2.append(int(math.log2(R))) + idx = idx // R + if (idx > 1): + logR2.append(int(math.log2(idx))) + # Bitreversal + indexes = [] + for x in range(N): + result = 0 + for bits in logR2: + mask = (0xffffffff >> (32 - bits)) + result = (result << bits) | (x & mask) + x = x >> bits + indexes.append(result) + # Create transpositions table + tps = [] + for c in Permutation.from_sequence(indexes).cyclic_form: + for i in range(len(c) - 1): + tps.append([c[i] * 8, c[-1] * 8]) + return np.ndarray.flatten(np.array(tps)) + + +def q_sat(x): + if x > 2**15 - 1: + return x - 2**16 + elif x < -2**15: + return x + 2**16 + else: + return x + + +def qchest(in_rx, in_tx, division=False, fixed_point=8, mytype=np.int16): + """Perform fixed-point complex channel estimation (CHEST). + in_rx (np.ndarray): Received signal array (complex numbers). + in_tx (np.ndarray): Transmitted signal array (complex numbers). + division (bool): Whether to perform division or multiplication. + Defaults to False. + fixed_point (int): Number of bits for the fractional part. Defaults to 8. + mytype (np.dtype): Data type for fixed-point representation. + Defaults to np.int16. + + Returns: + np.ndarray: Resulting array in fixed-point representation. + """ + SCALE_FACTOR = 2**fixed_point + n_rx = in_rx.size + n_tx = in_tx.size + + # Resulting array (real and imaginary interleaved) + result = np.zeros(2 * (n_tx * n_rx), dtype=mytype) + + for i in range(n_rx): + a_r = in_rx[i].real + a_i = in_rx[i].imag + for j in range(n_tx): + b_r = in_tx[j].real + b_i = in_tx[j].imag + + if division: + # Compute data division + den = (2**16) // (b_r * b_r + b_i * b_i) + if den == 0: + raise ZeroDivisionError( + "Division by zero encountered in CHEST.") + num_r = (a_r * b_r + a_i * b_i) + num_i = (a_i * b_r - a_r * b_i) + result[2 * (i * n_tx + j)] = (num_r // den) * SCALE_FACTOR + result[2 * (i * n_tx + j) + 1] = (num_i // den) * SCALE_FACTOR + else: + # Compute data multiplication + num_r = (a_r * b_r - a_i * b_i) + num_i = (a_i * b_r + a_r * b_i) + result[2 * (i * n_tx + j)] = q_sat(num_r // SCALE_FACTOR) + result[2 * (i * n_tx + j) + 1] = q_sat(num_i // SCALE_FACTOR) + + return result