From 3c3d269c2416c35fdb6c8a5843bb39ad427a36e7 Mon Sep 17 00:00:00 2001 From: JdeJong Date: Wed, 13 Nov 2019 12:46:29 +0100 Subject: [PATCH 01/13] small conventions fix, reference to file changed --- examples/generate_fake.py | 2 +- pipeline/pipeline.py | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/generate_fake.py b/examples/generate_fake.py index 36bb5b8..b395d2f 100644 --- a/examples/generate_fake.py +++ b/examples/generate_fake.py @@ -26,7 +26,7 @@ b'nbits': 8 } -filename = './examples/pspm.fil' +filename = './examples/pspm32.fil' # generate a fake filterbank file generate_file(filename, header) diff --git a/pipeline/pipeline.py b/pipeline/pipeline.py index f77b22e..213cd35 100644 --- a/pipeline/pipeline.py +++ b/pipeline/pipeline.py @@ -5,6 +5,7 @@ import os import sys import inspect + CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) PARENT_DIR = os.path.dirname(CURRENT_DIR) sys.path.insert(0, PARENT_DIR) @@ -15,6 +16,7 @@ import dedisperse import fourier + # pylint: disable=too-many-locals # pylint: disable=too-many-arguments # pylint: disable=invalid-name @@ -46,7 +48,6 @@ def __init__(self, filename=None, as_stream=False, DM=230, scale=3, n=None, size file.write(str(result) + ",") file.close() - def read_rows(self, filename): """ Read the filterbank data as stream @@ -62,7 +63,6 @@ def read_rows(self, filename): time_stop = timer() - time_start return time_stop - def read_n_rows(self, n, filename, DM, scale): """ Read the filterbank data as stream @@ -85,7 +85,6 @@ def read_n_rows(self, n, filename, DM, scale): stopwatch_list.append(stopwatch) return stopwatch_list - def read_static(self, filename, DM, scale, size): """ Read the filterbank data at once @@ -106,7 +105,6 @@ def read_static(self, filename, DM, scale, size): stopwatch = self.measure_methods(stopwatch, fil_data, freqs, DM, scale) return stopwatch - def measure_methods(self, stopwatch, fil_data, freqs, DM, scale): """ Run and time all methods/modules From 8dc479ab008b901b88184ed78a674736bfe97180 Mon Sep 17 00:00:00 2001 From: JdeJong Date: Thu, 14 Nov 2019 13:49:52 +0100 Subject: [PATCH 02/13] created script for making filterbank files readable to 'oomans --- write_filterbank_data_readable.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 write_filterbank_data_readable.py diff --git a/write_filterbank_data_readable.py b/write_filterbank_data_readable.py new file mode 100644 index 0000000..c99173f --- /dev/null +++ b/write_filterbank_data_readable.py @@ -0,0 +1,29 @@ +import filterbank.filterbank as filterbank +import filterbank.header as header +import filterbank.filterbank as filterbank + +f= open("filReadable.txt","w+") + +fb = filterbank.Filterbank(filename='./pspm32.fil', read_all=True) +f.write('header: \n') +f.write(str(fb.get_header()) + '\n') +f.write('setup_channels: \n') +f.write(str(fb.setup_chans()) + '\n') +f.write('number of channels:\n') +f.write(str(fb.n_chans) + '\n') +f.write('number of samples: \n') +f.write(str(fb.n_samples) + '\n') +f.write('frequencies: \n') +i = 0 +for freq in fb.get_freqs(): + f.write(str(i) + ': ' + str(freq) + '\n') + i += 1 +f.write('data: \n') +i = 0 +for data in fb.data: + f.write(str(i) + ': ' + str(data) + '\n') + +# doesnt work: +# for iterator in range(len(fb.get_freqs())): +# f.write(str(fb.get_freqs()[iterator]) + '\n') +# f.write(str(fb.data()[iterator]) + '\n') From 47cc389247d815c1147f4e7132066bd4d60f381c Mon Sep 17 00:00:00 2001 From: JdeJong Date: Thu, 14 Nov 2019 13:51:18 +0100 Subject: [PATCH 03/13] corrected error --- write_filterbank_data_readable.py | 1 + 1 file changed, 1 insertion(+) diff --git a/write_filterbank_data_readable.py b/write_filterbank_data_readable.py index c99173f..d9a2131 100644 --- a/write_filterbank_data_readable.py +++ b/write_filterbank_data_readable.py @@ -22,6 +22,7 @@ i = 0 for data in fb.data: f.write(str(i) + ': ' + str(data) + '\n') + i += 1 # doesnt work: # for iterator in range(len(fb.get_freqs())): From 02a096277262dccb7a5290a43f9b8bf6f2135748 Mon Sep 17 00:00:00 2001 From: "B.J.J. Lochan" Date: Sat, 16 Nov 2019 17:17:51 +0100 Subject: [PATCH 04/13] Created a input stream echobot server --- examples/testStreamServer.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 examples/testStreamServer.py diff --git a/examples/testStreamServer.py b/examples/testStreamServer.py new file mode 100644 index 0000000..f3aea4a --- /dev/null +++ b/examples/testStreamServer.py @@ -0,0 +1,30 @@ +import asyncio + +async def handle_echo(reader, writer): + data = await reader.read(100) + message = data.decode() + addr = writer.get_extra_info('peername') + + print(f"Received {message!r} from {addr!r}") + + print(f"Send: {message!r}") + writer.write(data) + await writer.drain() + + await handle_echo(reader, writer) + + # print("Close the connection") + # writer.close() + +async def main(): + server = await asyncio.start_server( + handle_echo, '127.0.0.1', 8080) + + addr = server.sockets[0].getsockname() + print(f'Serving on {addr}') + + + async with server: + await server.serve_forever() + +asyncio.run(main()) \ No newline at end of file From 8150b29595aa84609cff871cce5ff96d83c87509 Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Fri, 22 Nov 2019 11:39:03 +0100 Subject: [PATCH 05/13] added some examples for working with input stream and RTLSDR --- examples/input_stream.py | 23 +++++++++++++++++ examples/input_stream_rtlsdr.py | 18 ++++++++++++++ examples/interactive_rtlsdr.py | 34 ++++++++++++++++++++++++++ examples/visualize_psd.py | 2 +- examples/visualize_waterfall_stream.py | 10 +++++--- 5 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 examples/input_stream.py create mode 100644 examples/input_stream_rtlsdr.py create mode 100644 examples/interactive_rtlsdr.py diff --git a/examples/input_stream.py b/examples/input_stream.py new file mode 100644 index 0000000..07971e0 --- /dev/null +++ b/examples/input_stream.py @@ -0,0 +1,23 @@ +""" + Example of plotting a Power Spectral Density plot, using filterbank data +""" +import asyncio +from filterbank.filterbank import Filterbank + +async def streaming(): + # Instatiate the filterbank reader and point to the filterbank file + fb = Filterbank(filename='./pspm32.fil', read_all=True) + + # read the data in the filterbank file + f, samples = fb.select_data() + + # Get the powerlevels and the frequencies + for i in range(len(samples)): + # for j in range(len(samples[i])): + # print(samples[i][j]) + print(samples[i]) + print("") + await asyncio.sleep(1) + +loop = asyncio.get_event_loop() +loop.run_until_complete(streaming()) \ No newline at end of file diff --git a/examples/input_stream_rtlsdr.py b/examples/input_stream_rtlsdr.py new file mode 100644 index 0000000..5235b97 --- /dev/null +++ b/examples/input_stream_rtlsdr.py @@ -0,0 +1,18 @@ +import asyncio +from rtlsdr import RtlSdr + +async def streaming(): + sdr = RtlSdr() + + async for samples in sdr.stream(): + print(samples[1:5]) + # ... + + # to stop streaming: + await sdr.stop() + + # done + sdr.close() + +loop = asyncio.get_event_loop() +loop.run_until_complete(streaming()) \ No newline at end of file diff --git a/examples/interactive_rtlsdr.py b/examples/interactive_rtlsdr.py new file mode 100644 index 0000000..bf04d08 --- /dev/null +++ b/examples/interactive_rtlsdr.py @@ -0,0 +1,34 @@ +from matplotlib import pyplot as plt +import matplotlib.animation as animation +from rtlsdr import RtlSdr +import numpy as np + +sdr = RtlSdr() +# configure device +sdr.sample_rate = 2.4e6 # Hz +sdr.center_freq = 94.7e6 # Hz +sdr.freq_correction = 60 # PPM +sdr.gain = 'auto' + +fig = plt.figure() +graph_out = fig.add_subplot(1, 1, 1) + + +def animate(i): + graph_out.clear() + #samples = sdr.read_samples(256*1024) + samples = sdr.read_samples(128*1024) + # use matplotlib to estimate and plot the PSD + graph_out.psd(samples, NFFT=1024, Fs=sdr.sample_rate / + 1e6, Fc=sdr.center_freq/1e6) + #graph_out.xlabel('Frequency (MHz)') + #graph_out.ylabel('Relative power (dB)') + + +try: + ani = animation.FuncAnimation(fig, animate, interval=10) + plt.show(block=True) +except KeyboardInterrupt: + pass +finally: + sdr.close() \ No newline at end of file diff --git a/examples/visualize_psd.py b/examples/visualize_psd.py index bc2a744..8cb5f8c 100644 --- a/examples/visualize_psd.py +++ b/examples/visualize_psd.py @@ -45,4 +45,4 @@ # Plot the PSD plt.plot(freqs, power_levels) -plt.show() +plt.show(block=True) diff --git a/examples/visualize_waterfall_stream.py b/examples/visualize_waterfall_stream.py index 7d93953..c913986 100644 --- a/examples/visualize_waterfall_stream.py +++ b/examples/visualize_waterfall_stream.py @@ -8,14 +8,16 @@ from filterbank.filterbank import Filterbank from plot import waterfall import pylab as pyl +import matplotlib.pyplot as plt from plot.plot import next_power_of_2 +fb = Filterbank(filename='./pspm8.fil', read_all=True) +f, samples = fb.select_data() -fb = Filterbank(filename='./pspm32.fil') - -wf = waterfall.Waterfall(fb=fb, fig=pyl.figure(), mode="stream") +wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream', samples=samples) fig, update, frames, repeat = wf.animated_plotter() ani = animation.FuncAnimation(fig, update, frames=frames,repeat=repeat) -pyl.show() +plt.ion() +plt.show(block=True) From 4d3925f7c238adb4215947940814d364701ce428 Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Thu, 28 Nov 2019 16:35:07 +0100 Subject: [PATCH 06/13] Added an asyncroncally example for the filterbank files --- examples/read_async_example.py | 47 ++++++++++++++++++++++++++++++++++ filterbank/filterbank.py | 19 +++++++++----- 2 files changed, 60 insertions(+), 6 deletions(-) create mode 100644 examples/read_async_example.py diff --git a/examples/read_async_example.py b/examples/read_async_example.py new file mode 100644 index 0000000..3921666 --- /dev/null +++ b/examples/read_async_example.py @@ -0,0 +1,47 @@ +""" + Example of plotting a Power Spectral Density plot, using filterbank data +""" +# pylint: disable-all +import os +import sys +import inspect +import numpy as np +import matplotlib.pyplot as plt + +CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +PARENT_DIR = os.path.dirname(CURRENT_DIR) +sys.path.insert(0, PARENT_DIR) + +from plot import opsd +from filterbank.header import read_header +from filterbank.filterbank import Filterbank +import asyncio +import time + +async def create_filterbank(): + filterbank = Filterbank(filename='./pspm32.fil', read_all=True) + await filterbank._init() + + return filterbank + +async def main(): +# Instatiate the filterbank reader and point to the filterbank file + print(f"started at {time.strftime('%X')}") + + fb = await create_filterbank() + for i in range(0, 127): + row = await fb.next_row() + print(row) + + + # # read the data in the filterbank file + # f, samples = await fb.select_data() + # print(f) + + # print(samples) + + print(f"finished at {time.strftime('%X')}") + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/filterbank/filterbank.py b/filterbank/filterbank.py index ccae63b..b1fb222 100644 --- a/filterbank/filterbank.py +++ b/filterbank/filterbank.py @@ -4,6 +4,8 @@ import os import numpy as np +import asyncio +from aiofile import AIOFile, LineReader from .header import read_header, len_header @@ -53,11 +55,14 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False): # number if stream iterations self.stream_iter = (self.n_samples * self.n_ifs) # read filterbank at once - if read_all: - self.read_filterbank() + # if read_all: + # self.read_filterbank() + async def _init(self): + print("t") + # await self.read_filterbank() - def read_filterbank(self): + async def read_filterbank(self): """ Read filterbank file and transform to tuple of 3 matrices: including the sample amount, number of intermediate channels @@ -79,7 +84,8 @@ def read_filterbank(self): self.fil.close() - def next_row(self): + async def next_row(self): + """ Read filterbank file per row @@ -93,13 +99,14 @@ def next_row(self): data = np.fromfile(self.fil, count=self.n_chans_selected, dtype=self.dd_type) # skip bytes till start of next chunk self.fil.seek(self.n_bytes * (self.n_chans - self.i_1), 1) + await asyncio.sleep(1) else: data = False self.fil.close() return data - def next_n_rows(self, n_rows): + async def next_n_rows(self, n_rows): """ Read filterbank per n rows @@ -187,7 +194,7 @@ def setup_chans(self, freq_range=None): return i_0, i_1 - def select_data(self, freq_start=None, freq_stop=None, time_start=None, time_stop=None): + async def select_data(self, freq_start=None, freq_stop=None, time_start=None, time_stop=None): """ Select a range of data from the filterbank file From d35fb02ea69e4ba9d64c5e22b447d0972f10cdec Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Sun, 1 Dec 2019 19:45:19 +0100 Subject: [PATCH 07/13] add asynchronous filterbank waterfall plot example --- ...synchronously_filterbank_waterfall_plot.py | 51 +++++++++++++++++++ filterbank/filterbank.py | 4 +- plot/waterfall.py | 8 +-- 3 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 examples/asynchronously_filterbank_waterfall_plot.py diff --git a/examples/asynchronously_filterbank_waterfall_plot.py b/examples/asynchronously_filterbank_waterfall_plot.py new file mode 100644 index 0000000..cfca157 --- /dev/null +++ b/examples/asynchronously_filterbank_waterfall_plot.py @@ -0,0 +1,51 @@ +# pylint: disable-all +import os,sys,inspect + +import waterfall as waterfall + +CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +PARENT_DIR = os.path.dirname(CURRENT_DIR) +sys.path.insert(0,PARENT_DIR) +import matplotlib.animation as animation +from filterbank.header import read_header +from filterbank.filterbank import Filterbank +from plot import waterfall +import pylab as pyl +import matplotlib.pyplot as plt +from plot.plot import next_power_of_2 +import asyncio +import time + +async def create_waterfall(fb): + wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream') + await wf._init() + wf.init_plot() + + return wf + +async def create_filterbank(): + filterbank = Filterbank(filename='./pspm32.fil') + # await filterbank._init() + + return filterbank + +async def main(): + # fb = await create_filterbank() + fb = Filterbank(filename='./pspm32.fil') + + # wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream') + wf = await create_waterfall(fb) + + # fig, update, frames, repeat = await wf.animated_plotter() + + fig, update, frames, repeat = wf.animated_plotter() + + ani = animation.FuncAnimation(fig, update, frames=frames,repeat=repeat) + plt.show(block=True) + + +if __name__ == "__main__": + print(f"started at {time.strftime('%X')}") + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + print(f"finished at {time.strftime('%X')}") diff --git a/filterbank/filterbank.py b/filterbank/filterbank.py index b1fb222..bbf4387 100644 --- a/filterbank/filterbank.py +++ b/filterbank/filterbank.py @@ -84,7 +84,7 @@ async def read_filterbank(self): self.fil.close() - async def next_row(self): + def next_row(self): """ Read filterbank file per row @@ -99,7 +99,6 @@ async def next_row(self): data = np.fromfile(self.fil, count=self.n_chans_selected, dtype=self.dd_type) # skip bytes till start of next chunk self.fil.seek(self.n_bytes * (self.n_chans - self.i_1), 1) - await asyncio.sleep(1) else: data = False self.fil.close() @@ -123,6 +122,7 @@ async def next_n_rows(self, n_rows): data[row] = np.fromfile(self.fil, count=self.n_chans_selected, dtype=self.dd_type) # skip bytes till start of next chunk self.fil.seek(self.n_bytes * (self.n_chans - self.i_1), 1) + # await asyncio.sleep(0.1) else: data = False self.fil.close() diff --git a/plot/waterfall.py b/plot/waterfall.py index b2788d5..eb4f7e8 100644 --- a/plot/waterfall.py +++ b/plot/waterfall.py @@ -14,7 +14,7 @@ class Waterfall(): # All these attributes are needed. def __init__(self, filter_bank=None, center_freq=None, sample_freq=None, fig=None, scans_per_sweep=None, - max_n_rows=1024, mode='stream', t_obs=None): + max_n_rows=128, mode='stream', t_obs=None): """ Setup waterfall object """ @@ -46,12 +46,14 @@ def __init__(self, filter_bank=None, center_freq=None, sample_freq=None, time_stop=time_stop) else: freqs = filter_bank.get_freqs() - self.samples = self.filter_bank.next_n_rows(self.max_n_rows) self.freqs = np.asarray(freqs) - self.init_plot() + # self.init_plot() + + async def _init(self): + self.samples = await self.filter_bank.next_n_rows(self.max_n_rows) def init_plot(self): """ From 1a579f3216e9102415b7484b618474145067292e Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Thu, 5 Dec 2019 16:08:16 +0100 Subject: [PATCH 08/13] made asynchronous test and fix --- ...synchronously_filterbank_waterfall_plot.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/examples/asynchronously_filterbank_waterfall_plot.py b/examples/asynchronously_filterbank_waterfall_plot.py index cfca157..e23103f 100644 --- a/examples/asynchronously_filterbank_waterfall_plot.py +++ b/examples/asynchronously_filterbank_waterfall_plot.py @@ -20,32 +20,33 @@ async def create_waterfall(fb): wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream') await wf._init() wf.init_plot() - return wf -async def create_filterbank(): +async def create_filterbank(test_async=None): filterbank = Filterbank(filename='./pspm32.fil') + + # just a async test to see if it works + if test_async is not None: + await asyncio.sleep(1) + print(test_async) + else: + print("1") + + # await filterbank._init() return filterbank async def main(): - # fb = await create_filterbank() - fb = Filterbank(filename='./pspm32.fil') + # creates fb2 first and then fb1, because of sleep delay in create_filterbank method + fb1, fb2 = await asyncio.gather(create_filterbank("2"), create_filterbank()) + wf1, wf2 = await asyncio.gather(create_waterfall(fb1), create_waterfall(fb2)) - # wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream') - wf = await create_waterfall(fb) - - # fig, update, frames, repeat = await wf.animated_plotter() - - fig, update, frames, repeat = wf.animated_plotter() + fig, update, frames, repeat = wf1.animated_plotter() ani = animation.FuncAnimation(fig, update, frames=frames,repeat=repeat) plt.show(block=True) - if __name__ == "__main__": - print(f"started at {time.strftime('%X')}") loop = asyncio.get_event_loop() loop.run_until_complete(main()) - print(f"finished at {time.strftime('%X')}") From 2b33cd665e3fefa03bd6af910fb5d89c14d80af4 Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Thu, 5 Dec 2019 16:13:15 +0100 Subject: [PATCH 09/13] plot fix for asynchronously_filterbank --- examples/asynchronously_filterbank_waterfall_plot.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/asynchronously_filterbank_waterfall_plot.py b/examples/asynchronously_filterbank_waterfall_plot.py index e23103f..8699b22 100644 --- a/examples/asynchronously_filterbank_waterfall_plot.py +++ b/examples/asynchronously_filterbank_waterfall_plot.py @@ -31,8 +31,6 @@ async def create_filterbank(test_async=None): print(test_async) else: print("1") - - # await filterbank._init() return filterbank @@ -43,8 +41,10 @@ async def main(): wf1, wf2 = await asyncio.gather(create_waterfall(fb1), create_waterfall(fb2)) fig, update, frames, repeat = wf1.animated_plotter() - ani = animation.FuncAnimation(fig, update, frames=frames,repeat=repeat) + + fig2, update2, frames2, repeat2 = wf2.animated_plotter() + ani2 = animation.FuncAnimation(fig2, update2, frames=frames2, repeat=repeat2) plt.show(block=True) if __name__ == "__main__": From c1c99be9dd4ee767de22a25acb0e64180f8a1eba Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Tue, 10 Dec 2019 12:33:49 +0100 Subject: [PATCH 10/13] Made asynchronous and synchronous filterbank seperately --- ...synchronously_filterbank_waterfall_plot.py | 40 ++- filterbank/async_filterbank.py | 232 ++++++++++++++++++ filterbank/filterbank.py | 15 +- plot/waterfall.py | 18 +- 4 files changed, 281 insertions(+), 24 deletions(-) create mode 100644 filterbank/async_filterbank.py diff --git a/examples/asynchronously_filterbank_waterfall_plot.py b/examples/asynchronously_filterbank_waterfall_plot.py index 8699b22..1c59f43 100644 --- a/examples/asynchronously_filterbank_waterfall_plot.py +++ b/examples/asynchronously_filterbank_waterfall_plot.py @@ -7,35 +7,61 @@ PARENT_DIR = os.path.dirname(CURRENT_DIR) sys.path.insert(0,PARENT_DIR) import matplotlib.animation as animation -from filterbank.header import read_header -from filterbank.filterbank import Filterbank +from filterbank.async_filterbank import AsyncFilterbank from plot import waterfall import pylab as pyl import matplotlib.pyplot as plt -from plot.plot import next_power_of_2 import asyncio -import time + async def create_waterfall(fb): - wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream') + ''' + creates an asynchronous waterfall object which needs a filterbank object where it initializes a plot and returns + this object + + @param fb: A filterbank file for the plot + @type fb: Filterbank + @return: a waterplot object + @rtype: Waterfall + + ''' + + wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream', sync=False) await wf._init() wf.init_plot() return wf async def create_filterbank(test_async=None): - filterbank = Filterbank(filename='./pspm32.fil') + + ''' + creates an asyncronous Filterbank file. Also it checks if the the function is actually asynchronous. + @param test_async: checks to so if the method is asynchronous. + + @type test_async: String + @return: A filterbank object + @rtype: Filterbank + ''' + + filterbank = AsyncFilterbank(filename='./pspm32.fil') # just a async test to see if it works if test_async is not None: + # if the following line is commented out then it will first create fb1 and then fb2. See main() for the comment. await asyncio.sleep(1) print(test_async) else: print("1") - # await filterbank._init() return filterbank async def main(): + ''' + creates streaming waterfall plot by getting a filterbank file as an input. + + @return: shows streaming plot. You can use "ipython" (search it on Google or go to: + https://ipython.org/install.html) to get an animated plot + ''' + # creates fb2 first and then fb1, because of sleep delay in create_filterbank method fb1, fb2 = await asyncio.gather(create_filterbank("2"), create_filterbank()) wf1, wf2 = await asyncio.gather(create_waterfall(fb1), create_waterfall(fb2)) diff --git a/filterbank/async_filterbank.py b/filterbank/async_filterbank.py new file mode 100644 index 0000000..0054c92 --- /dev/null +++ b/filterbank/async_filterbank.py @@ -0,0 +1,232 @@ +""" + Utilities for reading data from filterbank file +""" + +import os +import numpy as np +import asyncio +from aiofile import AIOFile, LineReader +from .header import read_header, len_header + + +class AsyncFilterbank: + """ + Processing .fil files + """ + + # pylint: disable=too-many-instance-attributes + + def __init__(self, filename, freq_range=None, time_range=None, read_all=False): + """ + Initialize Filterbank object + + Args: + freq_range, tuple of freq_start and freq_stop in MHz + time_range, tuple of time_start and time_stop + read_all, whether to read all data at once + """ + if not os.path.isfile(filename): + raise FileNotFoundError(filename) + # header values + self.data, self.freqs, self.n_chans_selected = None, None, None + self.filename = filename + self.header = read_header(filename) + self.idx_data = len_header(filename) + self.n_bytes = self.header[b'nbytes'] + self.n_chans = self.header[b'nchans'] + self.n_ifs = self.header[b'nifs'] + # decide appropriate datatype + if self.n_bytes == 4: + self.dd_type = b'float32' + elif self.n_bytes == 2: + self.dd_type = b'uint16' + elif self.n_bytes == 1: + self.dd_type = b'uint8' + # open filterbank file + self.fil = open(self.filename, 'rb') + # skip the header bytes + self.fil.seek(self.idx_data) + # find possible time range + self.ii_start, self.n_samples = self.setup_time(time_range) + # search for start of data + self.fil.seek(int(self.ii_start * self.n_bytes * self.n_ifs * self.n_chans), 1) + # find possible channels + self.i_0, self.i_1 = self.setup_chans(freq_range) + # number if stream iterations + self.stream_iter = (self.n_samples * self.n_ifs) + # read filterbank at once + if read_all: + #doesn't keep file open + self.read_filterbank() + + async def read_filterbank(self): + """ + Read filterbank file and transform to tuple of 3 matrices: + including the sample amount, number of intermediate channels + and the amount of selected frequencies/channels + """ + # set shape of data + self.data = np.zeros((self.n_samples, self.n_ifs, self.n_chans_selected), + dtype=self.dd_type) + # read for each time sample the intensity per frequency + for i_i in range(self.n_samples): + for j_j in range(self.n_ifs): + self.fil.seek(self.n_bytes * self.i_0, 1) + # add to matrix + self.data[i_i, j_j] = np.fromfile(self.fil, count=self.n_chans_selected, + dtype=self.dd_type) + # skip bytes till start of next chunk + self.fil.seek(self.n_bytes * (self.n_chans - self.i_1), 1) + # release file resources + self.fil.close() + + + def next_row(self): + + """ + Read filterbank file per row + + returns False if EOF + """ + if self.stream_iter > 0: + self.stream_iter -= 1 + # skip bytes + self.fil.seek(self.n_bytes * self.i_0, 1) + # read row of data + data = np.fromfile(self.fil, count=self.n_chans_selected, dtype=self.dd_type) + # skip bytes till start of next chunk + self.fil.seek(self.n_bytes * (self.n_chans - self.i_1), 1) + else: + data = False + self.fil.close() + return data + + + async def next_n_rows(self, n_rows): + """ + Read filterbank per n rows + + returns False if EOF + """ + if self.stream_iter - n_rows > 0: + self.stream_iter -= n_rows + # init array of n rows + data = np.zeros((n_rows, self.n_chans_selected), dtype=self.dd_type) + for row in range(n_rows): + # skip bytes + self.fil.seek(self.n_bytes * self.i_0, 1) + # read row of data + data[row] = np.fromfile(self.fil, count=self.n_chans_selected, dtype=self.dd_type) + # skip bytes till start of next chunk + self.fil.seek(self.n_bytes * (self.n_chans - self.i_1), 1) + # await asyncio.sleep(0.1) + else: + data = False + self.fil.close() + + return data + + + def setup_freqs(self, freq_range=None): + """ + Calculate the frequency range + """ + f_delt = self.header[b'foff'] + f_0 = self.header[b'fch1'] + i_start, i_stop = 0, self.n_chans + # frequency range is specified + if freq_range: + if freq_range[0]: + i_start = int((freq_range[0] - f_0) / f_delt) + if freq_range[1]: + i_stop = int((freq_range[1] - f_0) / f_delt) + chan_start_idx = np.int(i_start) + chan_stop_idx = np.int(i_stop) + # create evenly spaced interval for frequencies + if i_start < i_stop: + i_vals = np.arange(chan_start_idx, chan_stop_idx) + else: + i_vals = np.arange(chan_stop_idx, chan_start_idx) + # calculate all possible frequencies + self.freqs = f_delt * i_vals + f_0 + # amount of channels + self.n_chans_selected = self.freqs.shape[0] + # change channel order if reversed + if chan_stop_idx < chan_start_idx: + chan_stop_idx, chan_start_idx = chan_start_idx, chan_stop_idx + return chan_start_idx, chan_stop_idx + + + def setup_time(self, time_range=None): + """ + Calculate the time range + """ + t_delt = self.header[b'tsamp'] + t_0 = self.header[b'tstart'] + # calculate amount of bytes in file without header + n_bytes_data = os.path.getsize(self.filename) - self.idx_data + # calculate sample size + ii_start, ii_stop = 0, int(n_bytes_data / (self.n_bytes * self.n_chans * self.n_ifs)) + # time range is specified + if time_range: + if isinstance(time_range[0], int): + ii_start = time_range[0] + if isinstance(time_range[1], int): + ii_stop = time_range[1] + n_samples = ii_stop - ii_start + # calculate all possible time stamps + self.timestamps = np.arange(0, n_samples) * t_delt / 24. / 60. / 60. + t_0 + return ii_start, n_samples + + + def setup_chans(self, freq_range=None): + """ + Calculate the channel range + """ + chan_start_idx, chan_stop_idx = self.setup_freqs(freq_range) + # set lowest value + i_0 = np.min((chan_start_idx, chan_stop_idx)) + # set highest value + i_1 = np.max((chan_start_idx, chan_stop_idx)) + return i_0, i_1 + + + async def select_data(self, freq_start=None, freq_stop=None, time_start=None, time_stop=None): + """ + Select a range of data from the filterbank file + + time_start and time_start, can be a float that represents a sampling interval in s + """ + # find indices for sampling interval + if isinstance(time_start, float): + time_start = np.searchsorted(self.timestamps, (time_start / 1e5) + 50000) + if isinstance(time_stop, float): + time_stop = np.searchsorted(self.timestamps, (time_stop / 1e5) + 50000) + # if no frequency range is specified, select all frequencies + if freq_start is None: + freq_start = self.freqs[0] + if freq_stop is None: + freq_stop = self.freqs[-1] + # give indices of minimum and maximum frequency + i_0 = np.argmin(np.abs(self.freqs - freq_start)) + i_1 = np.argmin(np.abs(self.freqs - freq_stop)) + # reverse data if frequencies are reversed + if i_0 < i_1: + freq_data = self.freqs[i_0:i_1 + 1] + fil_data = np.squeeze(self.data[time_start:time_stop, ..., i_0:i_1 + 1]) + else: + freq_data = self.freqs[i_1:i_0 + 1] + fil_data = np.squeeze(self.data[time_start:time_stop, ..., i_1:i_0 + 1]) + return freq_data, fil_data + + def get_freqs(self): + """ + Returns the frequencies + """ + return self.freqs + + def get_header(self): + """ + Return a dictionary of the filterbank header + """ + return self.header diff --git a/filterbank/filterbank.py b/filterbank/filterbank.py index bbf4387..cca8f35 100644 --- a/filterbank/filterbank.py +++ b/filterbank/filterbank.py @@ -55,14 +55,11 @@ def __init__(self, filename, freq_range=None, time_range=None, read_all=False): # number if stream iterations self.stream_iter = (self.n_samples * self.n_ifs) # read filterbank at once - # if read_all: - # self.read_filterbank() + if read_all: + # doesn't keep file open + self.read_filterbank() - async def _init(self): - print("t") - # await self.read_filterbank() - - async def read_filterbank(self): + def read_filterbank(self): """ Read filterbank file and transform to tuple of 3 matrices: including the sample amount, number of intermediate channels @@ -105,7 +102,7 @@ def next_row(self): return data - async def next_n_rows(self, n_rows): + def next_n_rows(self, n_rows): """ Read filterbank per n rows @@ -194,7 +191,7 @@ def setup_chans(self, freq_range=None): return i_0, i_1 - async def select_data(self, freq_start=None, freq_stop=None, time_start=None, time_stop=None): + def select_data(self, freq_start=None, freq_stop=None, time_start=None, time_stop=None): """ Select a range of data from the filterbank file diff --git a/plot/waterfall.py b/plot/waterfall.py index eb4f7e8..42168fd 100644 --- a/plot/waterfall.py +++ b/plot/waterfall.py @@ -12,9 +12,9 @@ class Waterfall(): # pylint: disable=R0913 # All these attributes are needed. - def __init__(self, filter_bank=None, center_freq=None, sample_freq=None, - fig=None, scans_per_sweep=None, - max_n_rows=128, mode='stream', t_obs=None): + def __init__(self, filter_bank: object = None, center_freq: object = None, sample_freq: object = None, + fig: object = None, scans_per_sweep: object = None, + max_n_rows: object = 128, mode: object = 'stream', t_obs: object = None, sync: object = True) -> object: """ Setup waterfall object """ @@ -38,21 +38,23 @@ def __init__(self, filter_bank=None, center_freq=None, sample_freq=None, self.center_freq = center_freq self.scans_per_sweep = scans_per_sweep - + # freqs = None if mode == "discrete": time_start = 0 time_stop = int(self.t_obs//self.header[b'tsamp']) freqs, self.samples = filter_bank.select_data(time_start=time_start, time_stop=time_stop) else: - freqs = filter_bank.get_freqs() + if sync is not False: + self.filter_bank.next_n_rows(self.max_n_rows) + freqs = filter_bank.get_freqs() self.freqs = np.asarray(freqs) + if sync is not False: + self.init_plot() - # self.init_plot() - - async def _init(self): + async def _init(self) -> object: self.samples = await self.filter_bank.next_n_rows(self.max_n_rows) def init_plot(self): From 17c4d6da45f5a261449149c8270c9af476cfd713 Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Tue, 10 Dec 2019 12:56:36 +0100 Subject: [PATCH 11/13] Made asynchronous filterbank and waterfall_plot more modular --- ...synchronously_filterbank_waterfall_plot.py | 50 +++---------------- filterbank/create_async_filterbank.py | 40 +++++++++++++++ plot/create_async_waterfall_plot.py | 28 +++++++++++ 3 files changed, 75 insertions(+), 43 deletions(-) create mode 100644 filterbank/create_async_filterbank.py create mode 100644 plot/create_async_waterfall_plot.py diff --git a/examples/asynchronously_filterbank_waterfall_plot.py b/examples/asynchronously_filterbank_waterfall_plot.py index 1c59f43..342f4c9 100644 --- a/examples/asynchronously_filterbank_waterfall_plot.py +++ b/examples/asynchronously_filterbank_waterfall_plot.py @@ -12,47 +12,8 @@ import pylab as pyl import matplotlib.pyplot as plt import asyncio - - -async def create_waterfall(fb): - ''' - creates an asynchronous waterfall object which needs a filterbank object where it initializes a plot and returns - this object - - @param fb: A filterbank file for the plot - @type fb: Filterbank - @return: a waterplot object - @rtype: Waterfall - - ''' - - wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream', sync=False) - await wf._init() - wf.init_plot() - return wf - -async def create_filterbank(test_async=None): - - ''' - creates an asyncronous Filterbank file. Also it checks if the the function is actually asynchronous. - @param test_async: checks to so if the method is asynchronous. - - @type test_async: String - @return: A filterbank object - @rtype: Filterbank - ''' - - filterbank = AsyncFilterbank(filename='./pspm32.fil') - - # just a async test to see if it works - if test_async is not None: - # if the following line is commented out then it will first create fb1 and then fb2. See main() for the comment. - await asyncio.sleep(1) - print(test_async) - else: - print("1") - - return filterbank +from filterbank.create_async_filterbank import CreateAsyncFilterbank +from plot.create_async_waterfall_plot import CreateAsyncWaterfallPlot async def main(): ''' @@ -62,9 +23,12 @@ async def main(): https://ipython.org/install.html) to get an animated plot ''' + afb = CreateAsyncFilterbank() + awf = CreateAsyncWaterfallPlot() + # creates fb2 first and then fb1, because of sleep delay in create_filterbank method - fb1, fb2 = await asyncio.gather(create_filterbank("2"), create_filterbank()) - wf1, wf2 = await asyncio.gather(create_waterfall(fb1), create_waterfall(fb2)) + fb1, fb2 = await asyncio.gather(afb.create_filterbank("./pspm32.fil", "2"), afb.create_filterbank("./pspm32.fil")) + wf1, wf2 = await asyncio.gather(awf.create_waterfall(fb1), awf.create_waterfall(fb2)) fig, update, frames, repeat = wf1.animated_plotter() ani = animation.FuncAnimation(fig, update, frames=frames,repeat=repeat) diff --git a/filterbank/create_async_filterbank.py b/filterbank/create_async_filterbank.py new file mode 100644 index 0000000..0cab4ca --- /dev/null +++ b/filterbank/create_async_filterbank.py @@ -0,0 +1,40 @@ +# pylint: disable-all +import os,sys,inspect + +import waterfall as waterfall + +CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +PARENT_DIR = os.path.dirname(CURRENT_DIR) +sys.path.insert(0,PARENT_DIR) +import matplotlib.animation as animation +from filterbank.async_filterbank import AsyncFilterbank +from plot import waterfall +import pylab as pyl +import matplotlib.pyplot as plt +import asyncio + +class CreateAsyncFilterbank: + + async def create_filterbank(self, filename="./pspm32.fil", test_async=None): + + ''' + creates an asyncronous Filterbank file. Also it checks if the the function is actually asynchronous. + @param test_async: checks to so if the method is asynchronous. + + @type test_async: String + @return: A filterbank object + @rtype: Filterbank + ''' + + filterbank = AsyncFilterbank(filename=filename) + + # just a async test to see if it works + if test_async is not None: + # if the following line is commented out then it will first create fb1 and then fb2. See main() in + # asynchronously_filterbank_waterfall_plot.py for the comment. + await asyncio.sleep(1) + print(test_async) + else: + print("1") + + return filterbank diff --git a/plot/create_async_waterfall_plot.py b/plot/create_async_waterfall_plot.py new file mode 100644 index 0000000..376c2b2 --- /dev/null +++ b/plot/create_async_waterfall_plot.py @@ -0,0 +1,28 @@ +# pylint: disable-all +import os,sys,inspect + +import waterfall as waterfall + +CURRENT_DIR = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +PARENT_DIR = os.path.dirname(CURRENT_DIR) +sys.path.insert(0,PARENT_DIR) +from plot import waterfall +import pylab as pyl + +class CreateAsyncWaterfallPlot: + + async def create_waterfall(self, fb): + ''' + creates an asynchronous waterfall object which needs a filterbank object where it initializes a plot and returns + this object + + @param fb: A filterbank file for the plot + @type fb: Filterbank + @return: a waterplot object + @rtype: Waterfall + ''' + + wf = waterfall.Waterfall(filter_bank=fb, fig=pyl.figure(), mode='stream', sync=False) + await wf._init() + wf.init_plot() + return wf \ No newline at end of file From c9f050d28ccbbee5285bd3be9f5fc29ea4b01f59 Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Tue, 10 Dec 2019 14:25:01 +0100 Subject: [PATCH 12/13] Created an input stream server for filterbankfiles --- examples/input_stream_filterbank_server.py | 67 ++++++++++++++++++++++ examples/testStreamServer.py | 30 ---------- 2 files changed, 67 insertions(+), 30 deletions(-) create mode 100644 examples/input_stream_filterbank_server.py delete mode 100644 examples/testStreamServer.py diff --git a/examples/input_stream_filterbank_server.py b/examples/input_stream_filterbank_server.py new file mode 100644 index 0000000..564d907 --- /dev/null +++ b/examples/input_stream_filterbank_server.py @@ -0,0 +1,67 @@ +import asyncio +from filterbank.filterbank import Filterbank +import os,sys,inspect +currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +parentdir = os.path.dirname(currentdir) +sys.path.insert(0,parentdir) +import numpy as np +import pickle + +async def print_filterbank_data(reader, writer): + ''' + Reads and write data to the client. Also gets filterbank and gives data it contains back to the client. + @param reader: reads input from sys.stdin + @type reader: Reader + @param writer: writes data to server + @type writer: Writer + @return: prints Filterbank data + ''' + + data = await reader.read(100) + print(data) + message = data.decode().rstrip("\n\r") + addr = writer.get_extra_info('peername') + + # Instatiate the filterbank reader and point to the filterbank file + fb = Filterbank(filename="./"+message, read_all=True) + + # read the data in the filterbank file + f, samples = fb.select_data() + + # Get the powerlevels and the frequencies + for i in range(len(samples)): + # for j in range(len(samples[i])): + # print(samples[i][j]) + writer.write((str(samples[i]).encode())) + await writer.drain() + # await asyncio.sleep(1) + + print(f"Got file {message!r} from {addr!r}") + + # writer.write(data) + + await print_filterbank_data(reader, writer) + + # print("Close the connection") + # writer.close() + +async def main(): + + ''' + Asynchronous Input stream server for filterbank data. Input is a filterbank filename without "./". + Output is printed filterbank data. When this server is running, go to commandline and type: telnet 127.0.0.1 8080 + if it is connected then write filename like: pspm32.fil + @return: Server on localhost + ''' + + server = await asyncio.start_server( + print_filterbank_data, '127.0.0.1', 8080) + + addr = server.sockets[0].getsockname() + print(f'Serving on {addr}') + + + async with server: + await server.serve_forever() + +asyncio.run(main()) \ No newline at end of file diff --git a/examples/testStreamServer.py b/examples/testStreamServer.py deleted file mode 100644 index f3aea4a..0000000 --- a/examples/testStreamServer.py +++ /dev/null @@ -1,30 +0,0 @@ -import asyncio - -async def handle_echo(reader, writer): - data = await reader.read(100) - message = data.decode() - addr = writer.get_extra_info('peername') - - print(f"Received {message!r} from {addr!r}") - - print(f"Send: {message!r}") - writer.write(data) - await writer.drain() - - await handle_echo(reader, writer) - - # print("Close the connection") - # writer.close() - -async def main(): - server = await asyncio.start_server( - handle_echo, '127.0.0.1', 8080) - - addr = server.sockets[0].getsockname() - print(f'Serving on {addr}') - - - async with server: - await server.serve_forever() - -asyncio.run(main()) \ No newline at end of file From 4e935aa03bfd454aa485184a25daf0d379792048 Mon Sep 17 00:00:00 2001 From: Brian Lochan Date: Thu, 12 Dec 2019 18:00:25 +0100 Subject: [PATCH 13/13] Made server more user friendly and fixed remote connection via telnet --- examples/input_stream_filterbank_server.py | 66 ++++++++++++++-------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/examples/input_stream_filterbank_server.py b/examples/input_stream_filterbank_server.py index 564d907..033472c 100644 --- a/examples/input_stream_filterbank_server.py +++ b/examples/input_stream_filterbank_server.py @@ -4,10 +4,9 @@ currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parentdir = os.path.dirname(currentdir) sys.path.insert(0,parentdir) -import numpy as np -import pickle +import os -async def print_filterbank_data(reader, writer): +async def print_filterbank_data(reader, writer, not_connected=True): ''' Reads and write data to the client. Also gets filterbank and gives data it contains back to the client. @param reader: reads input from sys.stdin @@ -17,45 +16,62 @@ async def print_filterbank_data(reader, writer): @return: prints Filterbank data ''' + commands = "Type 'pspm32.fil' or 'pspm16.fil' or 'pspm8.fil' " \ + "for filterbank data\nType '!quit' to close the connection\n" + + if not_connected: + not_connected = False + print("device connected") + writer.write((("Welcome to the server!\n\n"+commands).encode())) + await writer.drain() + data = await reader.read(100) - print(data) - message = data.decode().rstrip("\n\r") - addr = writer.get_extra_info('peername') - # Instatiate the filterbank reader and point to the filterbank file - fb = Filterbank(filename="./"+message, read_all=True) + if data.decode().rstrip("\n\r") == "!quit": + print("Close the connection") + writer.close() - # read the data in the filterbank file - f, samples = fb.select_data() + else: + print("Received: " + data.decode()) + message = data.decode().rstrip("\n\r") - # Get the powerlevels and the frequencies - for i in range(len(samples)): - # for j in range(len(samples[i])): - # print(samples[i][j]) - writer.write((str(samples[i]).encode())) - await writer.drain() - # await asyncio.sleep(1) + if message == "pspm32.fil" or message == "pspm16.fil" or message == "pspm8.fil": + addr = writer.get_extra_info('peername') + + # Instatiate the filterbank reader and point to the filterbank file + fb = Filterbank(filename="./" + message, read_all=True) + + # read the data in the filterbank file + f, samples = fb.select_data() - print(f"Got file {message!r} from {addr!r}") + # Get the powerlevels and the frequencies + for i in range(len(samples)): + # for j in range(len(samples[i])): + # print(samples[i][j]) + writer.write((str(samples[i]).encode())) + writer.write((str("\n\n").encode())) + await writer.drain() + # await asyncio.sleep(1) - # writer.write(data) + print(f"Got file {message!r} from {addr!r}") - await print_filterbank_data(reader, writer) + else: + writer.write(("'" + message + "'" + " is not accepted. \n" + commands).encode()) - # print("Close the connection") - # writer.close() + await print_filterbank_data(reader, writer, not_connected) async def main(): ''' Asynchronous Input stream server for filterbank data. Input is a filterbank filename without "./". - Output is printed filterbank data. When this server is running, go to commandline and type: telnet 127.0.0.1 8080 - if it is connected then write filename like: pspm32.fil + Output is printed filterbank data. When this server is running, go to commandline and type: + telnet 12345 if it is connected then write filename like: pspm32.fil @return: Server on localhost ''' + local_ip = os.popen("ipconfig getifaddr en0").read().strip('\n') server = await asyncio.start_server( - print_filterbank_data, '127.0.0.1', 8080) + print_filterbank_data, local_ip, 12345) addr = server.sockets[0].getsockname() print(f'Serving on {addr}')