-
Notifications
You must be signed in to change notification settings - Fork 3
/
generate_channel.py
213 lines (178 loc) · 10.6 KB
/
generate_channel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import numpy as np
import scipy.io as sio
import random
# random.seed(1)
def generate_location(num_users):
location_user = np.empty([num_users, 3])
for k in range(num_users):
theta = np.random.rand()*2*np.pi
r = np.random.rand()*25
x = np.cos(theta)*r + 50
y = np.sin(theta)*r - 0
z = 0
coordinate_k = np.array([x, y, z])
location_user[k, :] = coordinate_k
return location_user
def path_loss(d,alpha = 2, L_0 = -30):
loss = np.sqrt(10**(L_0/10)*(d)**(-alpha))
return loss
def generate_pathloss_aoa_aod(location_user, location_bs, location_irs,alpha = [3.2,2.5,2.2], L_0 = -10):
"""
:param location_user: array (num_user,2)
:param location_bs: array (2,)
:param location_irs: array (2,)
:return: pathloss = (pathloss_irs_bs, pathloss_irs_user, pathloss_bs_user)
cos_phi = (cos_phi_1, cos_phi_2, cos_phi_3)
"""
num_user = location_user.shape[0]
# ========bs-irs==============
d0 = np.linalg.norm(location_bs - location_irs)
pathloss_irs_bs = path_loss(d0,alpha[1],L_0)
aoa_bs = ( location_irs[0] - location_bs[0]) / d0
aod_irs_y = (location_bs[1]-location_irs[1]) / d0
aod_irs_z = (location_bs[2]-location_irs[2]) / d0
# =========irs-user=============
pathloss_irs_user = []
aoa_irs_y = []
aoa_irs_z = []
for k in range(num_user):
d_k = np.linalg.norm(location_user[k] - location_irs)
pathloss_irs_user.append(path_loss(d_k,alpha[2],L_0))
aoa_irs_y_k = (location_irs[1] - location_user[k][1]) / d_k
aoa_irs_z_k = (location_irs[2] - location_user[k][2]) / d_k
aoa_irs_y.append(aoa_irs_y_k)
aoa_irs_z.append(aoa_irs_z_k)
aoa_irs_y = np.array(aoa_irs_y)
aoa_irs_z = np.array(aoa_irs_z)
# =========bs-user=============
pathloss_bs_user = np.zeros([num_user, 1])
for k in range(num_user):
d_k = np.linalg.norm(location_user[k] - location_bs)
pathloss_bs_user_k = path_loss(d_k,alpha[0],L_0)
pathloss_bs_user[k, :] = pathloss_bs_user_k
pathloss = (pathloss_irs_bs, np.array(pathloss_irs_user), np.array(pathloss_bs_user))
aoa_aod = (aoa_bs, aod_irs_y, aod_irs_z, aoa_irs_y, aoa_irs_z)
return pathloss, aoa_aod
def generate_channel(params_system, location_bs=np.array([-80, 0, 30]), location_irs=np.array([10,0,10]),
location_user_initial=None, Rician_factor=10, num_samples=100, irs_Nh = 10, L_0 = -30, alpha = [3.6,2.5,2.2]):
# scale_factor: can be viewed as (downlink noise_power_dB- downlink Pt)
(num_antenna_bs, num_elements_irs, num_user) = params_system
channel_bs_irs, channel_bs_user, channel_irs_user, set_location_user = [], [], [], []
for ii in range(num_samples):
if location_user_initial is None:
location_user = generate_location(num_user)
set_location_user.append(location_user)
else:
location_user = location_user_initial
set_location_user.append(location_user)
pathloss, aoa_aod = generate_pathloss_aoa_aod(location_user, location_bs, location_irs, alpha = alpha, L_0 = L_0)
(pathloss_irs_bs, pathloss_irs_user, pathloss_bs_user) = pathloss
(aoa_bs, aod_irs_y, aod_irs_z, aoa_irs_y, aoa_irs_z) = aoa_aod
# tmp:(num_antenna_bs,num_user) channel between BS and user
tmp = np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_user]) \
+ 1j * np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_user])
tmp = tmp * pathloss_bs_user.reshape(1, num_user)
channel_bs_user.append(tmp)
# tmp: (num_antenna_bs,num_elements_irs) channel between IRS and BS
tmp = np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_elements_irs]) \
+ 1j * np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_elements_irs])
a_bs = np.exp(1j * np.pi * aoa_bs * np.arange(num_antenna_bs))
a_bs = np.reshape(a_bs, [num_antenna_bs, 1])
i1 = np.mod(np.arange(num_elements_irs),irs_Nh)
i2 = np.floor(np.arange(num_elements_irs)/irs_Nh)
a_irs_bs = np.exp(1j * np.pi * (i1*aod_irs_y+i2*aod_irs_z))
a_irs_bs = np.reshape(a_irs_bs, [num_elements_irs, 1])
# array_response =
los_irs_bs = a_bs @ np.transpose(a_irs_bs.conjugate())
tmp = np.sqrt(Rician_factor / (1 + Rician_factor)) * los_irs_bs + np.sqrt(1/(1 + Rician_factor)) * tmp
tmp = tmp * pathloss_irs_bs
channel_bs_irs.append(tmp)
# tmp:(num_elements_irs,num_user) channel between IRS and user
tmp = np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_elements_irs, num_user]) \
+ 1j * np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_elements_irs, num_user])
for k in range(num_user):
a_irs_user = np.exp(1j * np.pi * (i1 * aoa_irs_y[k] + i2 * aoa_irs_z[k]))
tmp[:, k] = np.sqrt(Rician_factor/(1+Rician_factor))*a_irs_user+np.sqrt(1/(1+Rician_factor))*tmp[:, k]
tmp[:, k] = tmp[:, k] * pathloss_irs_user[k]
channel_irs_user.append(tmp)
channels = (np.array(channel_bs_user), np.array(channel_irs_user), np.array(channel_bs_irs))
return channels, set_location_user
def generate_channel_with_array_response(params_system, location_bs=np.array([-80, 0, 30]), location_irs=np.array([10,0,10]),
location_user_initial=None, Rician_factor=10, num_samples=100, irs_Nh = 10, L_0 = -30, alpha = [3.6,2.5,2.2]):
# scale_factor: can be viewed as (downlink noise_power_dB- downlink Pt)
(num_antenna_bs, num_elements_irs, num_user) = params_system
channel_bs_irs, channel_bs_user, channel_irs_user, set_location_user = [], [], [], []
for ii in range(num_samples):
if location_user_initial is None:
location_user = generate_location(num_user)
set_location_user.append(location_user)
else:
location_user = location_user_initial
set_location_user.append(location_user)
pathloss, aoa_aod = generate_pathloss_aoa_aod(location_user, location_bs, location_irs, alpha = alpha, L_0 = L_0)
(pathloss_irs_bs, pathloss_irs_user, pathloss_bs_user) = pathloss
(aoa_bs, aod_irs_y, aod_irs_z, aoa_irs_y, aoa_irs_z) = aoa_aod
# tmp:(num_antenna_bs,num_user) channel between BS and user
tmp = np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_user]) \
+ 1j * np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_user])
tmp = tmp * pathloss_bs_user.reshape(1, num_user)
channel_bs_user.append(tmp)
# tmp: (num_antenna_bs,num_elements_irs) channel between IRS and BS
tmp = np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_elements_irs]) \
+ 1j * np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_antenna_bs, num_elements_irs])
a_bs = np.exp(1j * np.pi * aoa_bs * np.arange(num_antenna_bs))
a_bs = np.reshape(a_bs, [num_antenna_bs, 1])
i1 = np.mod(np.arange(num_elements_irs),irs_Nh)
i2 = np.floor(np.arange(num_elements_irs)/irs_Nh)
a_irs_bs = np.exp(1j * np.pi * (i1*aod_irs_y+i2*aod_irs_z))
re_a_irs_bs = [np.exp(1j * np.pi * (i1*(aod_irs_y-aoa_irs_y[k])+i2*(aod_irs_z - aoa_irs_z[k]))) for k in range(num_user)]
a_irs_bs = np.reshape(a_irs_bs, [num_elements_irs, 1])
re_a_irs_bs = np.reshape(re_a_irs_bs, [num_elements_irs, 1])
# array_response =
los_irs_bs = a_bs @ np.transpose(a_irs_bs.conjugate())
tmp = np.sqrt(Rician_factor / (1 + Rician_factor)) * los_irs_bs + np.sqrt(1/(1 + Rician_factor)) * tmp
tmp = tmp * pathloss_irs_bs
channel_bs_irs.append(tmp)
# tmp:(num_elements_irs,num_user) channel between IRS and user
tmp = np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_elements_irs, num_user]) \
+ 1j * np.random.normal(loc=0, scale=np.sqrt(0.5), size=[num_elements_irs, num_user])
for k in range(num_user):
a_irs_user = np.exp(1j * np.pi * (i1 * aoa_irs_y[k] + i2 * aoa_irs_z[k]))
tmp[:, k] = np.sqrt(Rician_factor/(1+Rician_factor))*a_irs_user+np.sqrt(1/(1+Rician_factor))*tmp[:, k]
tmp[:, k] = tmp[:, k] * pathloss_irs_user[k]
channel_irs_user.append(tmp)
channels = (np.array(channel_bs_user), np.array(channel_irs_user), np.array(channel_bs_irs))
return channels, set_location_user, re_a_irs_bs
def channel_complex2real(channels):
channel_bs_user, channel_irs_user, channel_bs_irs = channels
(num_sample, num_antenna_bs, num_elements_irs) = channel_bs_irs.shape
num_user = channel_irs_user.shape[2]
A_T_real = np.zeros([num_sample, 2 * num_elements_irs, 2 * num_antenna_bs, num_user])
# Hd_real = np.zeros([num_sample, 2 * num_antenna_bs, num_user])
set_channel_combine_irs = np.zeros([num_sample, num_antenna_bs, num_elements_irs, num_user], dtype=complex)
for kk in range(num_user):
channel_irs_user_k = channel_irs_user[:, :, kk]
channel_combine_irs = channel_bs_irs * channel_irs_user_k.reshape(num_sample, 1, num_elements_irs)
set_channel_combine_irs[:, :, :, kk] = channel_combine_irs
A_tmp_tran = np.transpose(channel_combine_irs, (0, 2, 1))
A_tmp_real1 = np.concatenate([A_tmp_tran.real, A_tmp_tran.imag], axis=2)
A_tmp_real2 = np.concatenate([-A_tmp_tran.imag, A_tmp_tran.real], axis=2)
A_tmp_real = np.concatenate([A_tmp_real1, A_tmp_real2], axis=1)
A_T_real[:, :, :, kk] = A_tmp_real
Hd_real = np.concatenate([channel_bs_user.real, channel_bs_user.imag], axis=1)
return A_T_real, Hd_real, np.array(set_channel_combine_irs)
# def main():
# num_test = 1000
# num_antenna_bs, num_elements_irs, num_user = 8, 100, 3
# params_system = (num_antenna_bs, num_elements_irs, num_user)
# Rician_factor = 10
# location_user = None
# channel_true, set_location_user = generate_channel(params_system,Rician_factor=Rician_factor,
# num_samples=num_test)
# _, _, channel_bs_irs_user = channel_complex2real(channel_true)
# print('channel_bs_user:\n',np.mean(np.abs(channel_true[0])**2))
# print('channel_irs_user:\n',np.mean(np.abs(channel_true[1])**2))
# print('channel_bs_irs:\n',np.mean(np.abs(channel_true[2])**2))
# print('channel_bs_irs_user:\n',np.mean(np.abs(channel_bs_irs_user)**2))
# if __name__ == '__main__':
# main()