Commit c0b98e82 authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼

autopep8

parent 88e24d90
......@@ -4,6 +4,7 @@ from scipy.signal import get_window
from scipy.fft import fft, fftshift
from scipy import signal
def nmse(xMes, xEst):
"""
Function to compute the NMSE in time domain
......@@ -14,6 +15,7 @@ def nmse(xMes, xEst):
######################################
def algoLMS(PPHI, out, start, nSample):
"""
Classical LMS Algorithm
......@@ -33,8 +35,8 @@ def algoLMS(PPHI, out, start, nSample):
########################################
def computeFFT(x, fs, win):
def computeFFT(x, fs, win):
"""
Perform FFT on x
x : signal
......@@ -52,13 +54,14 @@ def computeFFT(x, fs, win):
fftSig = fft(x * w) / fft_block_size
fftAbs = np.abs(fftSig)
freq = (np.arange(0, N) - N/2) * fs / fft_block_size
freq = fftshift(freq) # Shift zero frequency component to center of spectrum
# Shift zero frequency component to center of spectrum
freq = fftshift(freq)
return fftAbs, freq
#########################################
def toPlotSpectrum(x, fs, win, Nblocks, interval):
def toPlotSpectrum(x, fs, win, Nblocks, interval):
"""
To plot the spectrum
x : signal
......@@ -90,15 +93,14 @@ def toPlotSpectrum(x, fs, win, Nblocks, interval):
else: # spectre entre [-fs/2 fs/2]
pxx = np.abs(fft_input)
pxx = fftshift(pxx)
freq = (np.arange(0, fft_block_size) - fft_block_size/2) * fs / fft_block_size
freq = (np.arange(0, fft_block_size) -
fft_block_size/2) * fs / fft_block_size
return pxx, freq
#################################################
def corelation(tx, rx, osf):
"""
Performs tx/rx synchronization and signal alignement
tx : transmitted signal
......@@ -130,7 +132,8 @@ def corelation(tx, rx, osf):
#################################################
def xcorr(x,y):
def xcorr(x, y):
"""
Perform Cross-Correlation on x and y
x : 1st signal
......@@ -148,9 +151,10 @@ def xcorr(x,y):
#################################################
def coarsePhaseCorrection(x, y, osf):
## Coarse phase correction using the preambule (zeroing AM/PM)
# Coarse phase correction using the preambule (zeroing AM/PM)
preambule = 5*80 # 2 STF + 2 LTF + 1 SIG
n = int(osf*preambule)
......@@ -163,8 +167,8 @@ def coarsePhaseCorrection(x, y, osf):
##################################################
def amam_ampm_plot(tx, ty):
def amam_ampm_plot(tx, ty):
"""
Perform AM/AM and AM/PM curves
ttx : transmitted signal
......@@ -179,13 +183,14 @@ def amam_ampm_plot(tx, ty):
plt.xlabel('Normalized input')
plt.ylabel('Normalized output')
plt.subplot(212)
plt.plot(np.abs(tx), np.angle(np.conj(tx) * ty),'.')
plt.plot(np.abs(tx), np.angle(np.conj(tx) * ty), '.')
plt.title('Dynamical AM/PM')
plt.xlabel('Normalized input')
plt.ylabel('Dephase (deg)')
###################################################
def identif_dpd_GMP(x, y, dpdModel):
dpdModel = estimGMPopt(y, x, dpdModel)
......@@ -195,8 +200,8 @@ def identif_dpd_GMP(x, y, dpdModel):
#################################################
def estimGMPopt(x, y, gmpModel):
def estimGMPopt(x, y, gmpModel):
"""
To estimate GMP coefficients
x : reference signal
......@@ -229,7 +234,8 @@ def estimGMPopt(x, y, gmpModel):
for m in range(1, Mb + 1):
it1 = np.concatenate((np.zeros(l + m), x[:nSample-l-m]))
for k in range(1, Kb + 1):
PPHI[:, La * Ka + (Lb * Mb * (k - 1)) + (Mb * l) + (m-1)] = (np.abs(it1))**k * it2
PPHI[:, La * Ka + (Lb * Mb * (k - 1)) +
(Mb * l) + (m-1)] = (np.abs(it1))**k * it2
for l in range(Lc):
it2 = np.concatenate((np.zeros(l), x[:nSample-l]))
......@@ -239,7 +245,8 @@ def estimGMPopt(x, y, gmpModel):
else:
it1 = np.concatenate((np.zeros(m - l), x[m-l:nSample])) * 0
for k in range(1, Kc + 1):
PPHI[:, La * Ka + Lb * Kb * Mb + (Lc * Mc * (k - 1)) + (Mc * l) + (m-1)] = (np.abs(it1))**k * it2
PPHI[:, La * Ka + Lb * Kb * Mb +
(Lc * Mc * (k - 1)) + (Mc * l) + (m-1)] = (np.abs(it1))**k * it2
start = 0
gmpModel['coefGMP'], gmpModel['H'] = algoLMS(PPHI, y, start, nSample)
......@@ -249,8 +256,8 @@ def estimGMPopt(x, y, gmpModel):
######################################################
def simGMPopt(x, gmpModel):
def simGMPopt(x, gmpModel):
"""
Simulates GMP model
x : input signal (stimulus)
......@@ -282,7 +289,8 @@ def simGMPopt(x, gmpModel):
for m in range(1, Mb + 1):
it1 = np.concatenate((np.zeros(l + m), x[:nSample-l-m]))
for k in range(1, Kb + 1):
PPHI[:, La * Ka + (Lb * Mb * (k - 1)) + (Mb * l) + (m-1)] = (np.abs(it1))**k * it2
PPHI[:, La * Ka + (Lb * Mb * (k - 1)) +
(Mb * l) + (m-1)] = (np.abs(it1))**k * it2
for l in range(Lc):
it2 = np.concatenate((np.zeros(l), x[:nSample-l]))
......@@ -292,7 +300,8 @@ def simGMPopt(x, gmpModel):
else:
it1 = np.concatenate((np.zeros(m - l), x[m-l:nSample])) * 0
for k in range(1, Kc + 1):
PPHI[:, La * Ka + Lb * Kb * Mb + (Lc * Mc * (k - 1)) + (Mc * l) + (m-1)] = (np.abs(it1))**k * it2
PPHI[:, La * Ka + Lb * Kb * Mb +
(Lc * Mc * (k - 1)) + (Mc * l) + (m-1)] = (np.abs(it1))**k * it2
outEst = np.zeros(nSample, dtype=np.complex128)
outEst = PPHI @ gmpModel['coefGMP']
......@@ -300,6 +309,3 @@ def simGMPopt(x, gmpModel):
return outEst
##################################################
......@@ -33,6 +33,7 @@
# In[47]:
from commonFunctions import * # XLIM used functions
import numpy as np
import adi # API for Adalm-Pluto
import matplotlib.pyplot as plt
......@@ -50,9 +51,6 @@ start = time.time() # start time to measure the execution time
# In[48]:
from commonFunctions import * # XLIM used functions
# ## Transmission without DPD
# ### Transmit original samples
......@@ -69,8 +67,8 @@ osf = 3
filePreambule = "preambule_BW20osf" + str(osf) + "_single.bin"
fileRx = "lteBW20osf" + str(osf) + "_single.bin"
sample_rate = 61.44e6 # 61.44e6 46.08e6 30.72e6 23.04e6 11.52e6
#fileRx = "wlanBW20osf" + str(osf) + "_single.bin"
#sample_rate = 60e6
# fileRx = "wlanBW20osf" + str(osf) + "_single.bin"
# sample_rate = 60e6
fp = open(path + filePreambule, 'rb')
preambule = np.fromfile(fp, dtype=np.complex64)
......@@ -107,10 +105,11 @@ sdr.sample_rate = sample_rate
freq = int(3650)
win_fft = 'flattop' # The used window for FFT ans spectrum plot
initialGain = int(0) # Here, the initial gain of TX is set to the minimal value for safety
# Here, the initial gain of TX is set to the minimal value for safety
initialGain = int(0)
sdr.tx_rf_bandwidth = int(sample_rate) # filter cutoff
sdr.tx_lo = freq*10**6
sdr.tx_hardwaregain_chan0 = initialGain #
sdr.tx_hardwaregain_chan0 = initialGain
sdr.tx_destroy_buffer() # Stop transmitting for safe
sdr.tx_cyclic_buffer = True # Enable cyclic buffers
......@@ -128,9 +127,10 @@ while (valid):
valid = input("1 to set txGain || 0 to compute DPD : ")
valid = bool(int(valid))
if valid == True:
value = input(f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
value = input(
f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
gain = float(value)
if gain < safetyGain: ## Gain limitation for safety
if gain < safetyGain: # Gain limitation for safety
sdr.tx_hardwaregain_chan0 = gain
else:
sdr.tx_hardwaregain_chan0 = safetyGain
......@@ -145,7 +145,8 @@ sdr.rx_lo = sdr.tx_lo
sdr.rx_rf_bandwidth = sdr.tx_rf_bandwidth
sdr.rx_buffer_size = 3*num_samps # 3 times to capture one sequence
sdr.gain_control_mode_chan0 = 'manual'
sdr.rx_hardwaregain_chan0 = 8 # set the receive gain, but be careful not to saturate the ADC
# set the receive gain, but be careful not to saturate the ADC
sdr.rx_hardwaregain_chan0 = 8
# Clear buffer just to be safe
......@@ -153,7 +154,7 @@ sdr.rx_hardwaregain_chan0 = 8 # set the receive gain, but be careful not to satu
# In[54]:
for i in range (0, 10):
for i in range(0, 10):
raw_data = sdr.rx()
......@@ -164,8 +165,9 @@ for i in range (0, 10):
rx_samples = sdr.rx()
avg_pwr = np.mean(np.abs(rx_samples/coeff)**2)
avg_pwr_dB = round(10*np.log10(avg_pwr),2)
print("check_rx = ", avg_pwr_dB, ": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
avg_pwr_dB = round(10*np.log10(avg_pwr), 2)
print("check_rx = ", avg_pwr_dB,
": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
# Synchronization and coarse phase correction using the preambule (zeroing AM/PM)
......@@ -173,13 +175,15 @@ print("check_rx = ", avg_pwr_dB, ": Best in -30 +/- 5. Else, adjust the rx_hardw
# In[56]:
[paIn, paOut] = corelation(samples, rx_samples, osf) # Synchronization using cross-corelation
# Synchronization using cross-corelation
[paIn, paOut] = corelation(samples, rx_samples, osf)
# In[57]:
paOut = coarsePhaseCorrection(paIn, paOut, osf) # Coarse phase correction using the preambule (zeroing AM/PM)
# Coarse phase correction using the preambule (zeroing AM/PM)
paOut = coarsePhaseCorrection(paIn, paOut, osf)
paIn /= np.max(np.abs(paIn)) # scaling
paOut /= np.max(np.abs(paOut)) # scaling
......@@ -193,7 +197,8 @@ paOut /= np.max(np.abs(paOut)) # scaling
plt.figure(10)
plt.plot((f + freq)/1e6, 10*np.log10(pxxWithout))
plt.grid()
plt.xlabel('Frequency (MHz)'); plt.ylabel('Spectrum')
plt.xlabel('Frequency (MHz)')
plt.ylabel('Spectrum')
plt.legend(['Original signal'])
plt.show()
......@@ -220,13 +225,15 @@ plt.show()
dpdModel = {'fs': sample_rate}
[Ka, La, Kb, Lb, Mb, Kc, Lc, Mc] = [7, 1, 0, 1, 1, 0, 1, 1] # set the dpd orders and memory depths
dpdModel['parameters'] = {'Ka':Ka, 'La':La, 'Kb':Kb, 'Lb':Lb, 'Mb':Mb, 'Kc':Kc, 'Lc':Lc, 'Mc':Mc};
# set the dpd orders and memory depths
[Ka, La, Kb, Lb, Mb, Kc, Lc, Mc] = [7, 1, 0, 1, 1, 0, 1, 1]
dpdModel['parameters'] = {'Ka': Ka, 'La': La, 'Kb': Kb,
'Lb': Lb, 'Mb': Mb, 'Kc': Kc, 'Lc': Lc, 'Mc': Mc}
gainATT = 1
dpdModel = identif_dpd_GMP(paIn, paOut/gainATT, dpdModel)
print("nmse of identification :", np.round(dpdModel['nmseTimedB'],2), " dB")
print("nmse of identification :", np.round(dpdModel['nmseTimedB'], 2), " dB")
dpdOutput = simGMPopt(paIn, dpdModel) # generate DPD signal
dpdOutput = dpdOutput/np.max(np.abs(dpdOutput))
......@@ -253,9 +260,10 @@ while (valid):
valid = input("1 to set txGain || 0 to compute DPD : ")
valid = bool(int(valid))
if valid == True:
value = input(f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
value = input(
f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
gain = float(value)
if gain < safetyGain: ## Gain limitation for safety
if gain < safetyGain: # Gain limitation for safety
sdr.tx_hardwaregain_chan0 = gain
else:
sdr.tx_hardwaregain_chan0 = safetyGain
......@@ -271,8 +279,9 @@ while (valid):
rx_samples = sdr.rx()
avg_pwr = np.mean(np.abs(rx_samples/coeff)**2)
avg_pwr_dB = round(10*np.log10(avg_pwr),2)
print("check_rx = ", avg_pwr_dB, ": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
avg_pwr_dB = round(10*np.log10(avg_pwr), 2)
print("check_rx = ", avg_pwr_dB,
": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
# Synchronization of input and output
......@@ -315,7 +324,8 @@ plt.subplot(223)
plt.plot(np.real(paIn)-np.real(paLin))
plt.title("Error in phase (I)")
plt.xlabel("Samples")
plt.subplot(224); plt.plot(np.imag(paIn)-np.imag(paLin))
plt.subplot(224)
plt.plot(np.imag(paIn)-np.imag(paLin))
plt.title("Error in qudadrature (Q)")
plt.xlabel("Samples")
plt.show()
......@@ -330,7 +340,9 @@ plt.show()
plt.figure(10)
plt.plot((f + freq)/1e6, 10*np.log10(pxxWith))
plt.grid()
plt.xlabel('Frequency (MHz)'); plt.ylabel('Spectrum'); plt.legend(['With DPD'])
plt.xlabel('Frequency (MHz)')
plt.ylabel('Spectrum')
plt.legend(['With DPD'])
plt.show()
......@@ -356,5 +368,4 @@ print(f'Elapsed time : {elapsed:.2}ms')
for i in dpdModel['coefGMP']:
print(np.round(i,3))
print(np.round(i, 3))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment