Skip to content

Commit

Permalink
Merge pull request #22 from ronaldokun/parallel
Browse files Browse the repository at this point in the history
⚡  Added parallelization in block creation
  • Loading branch information
ronaldokun authored Oct 31, 2021
2 parents 3401059 + bbf5523 commit be89490
Show file tree
Hide file tree
Showing 11 changed files with 536 additions and 482 deletions.
153 changes: 76 additions & 77 deletions README.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions docs/constants.html
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ <h3 id="Variables-Used-in-parser.py">Variables Used in <code>parser.py</code><a
<span class="mi">7</span><span class="p">:</span> <span class="p">(</span>
<span class="s2">&quot;type&quot;</span><span class="p">,</span>
<span class="s2">&quot;thread_id&quot;</span><span class="p">,</span>
<span class="s2">&quot;thresh&quot;</span><span class="p">,</span>
<span class="s2">&quot;start_mega&quot;</span><span class="p">,</span>
<span class="s2">&quot;stop_mega&quot;</span><span class="p">,</span>
<span class="s2">&quot;namal&quot;</span><span class="p">,</span>
Expand Down Expand Up @@ -279,6 +280,7 @@ <h3 id="Variables-Used-in-parser.py">Variables Used in <code>parser.py</code><a
<span class="mi">61</span><span class="p">:</span> <span class="p">(</span>
<span class="s2">&quot;type&quot;</span><span class="p">,</span>
<span class="s2">&quot;thread_id&quot;</span><span class="p">,</span>
<span class="s2">&quot;thresh&quot;</span><span class="p">,</span>
<span class="s2">&quot;start_mega&quot;</span><span class="p">,</span>
<span class="s2">&quot;stop_mega&quot;</span><span class="p">,</span>
<span class="s2">&quot;ndata&quot;</span><span class="p">,</span>
Expand Down Expand Up @@ -310,6 +312,7 @@ <h3 id="Variables-Used-in-parser.py">Variables Used in <code>parser.py</code><a
<span class="mi">64</span><span class="p">:</span> <span class="p">(</span>
<span class="s2">&quot;type&quot;</span><span class="p">,</span>
<span class="s2">&quot;thread_id&quot;</span><span class="p">,</span>
<span class="s2">&quot;thresh&quot;</span><span class="p">,</span>
<span class="s2">&quot;description&quot;</span><span class="p">,</span>
<span class="s2">&quot;start_mega&quot;</span><span class="p">,</span>
<span class="s2">&quot;stop_mega&quot;</span><span class="p">,</span>
Expand Down Expand Up @@ -344,6 +347,7 @@ <h3 id="Variables-Used-in-parser.py">Variables Used in <code>parser.py</code><a
<span class="mi">68</span><span class="p">:</span> <span class="p">(</span>
<span class="s2">&quot;type&quot;</span><span class="p">,</span>
<span class="s2">&quot;thread_id&quot;</span><span class="p">,</span>
<span class="s2">&quot;thresh&quot;</span><span class="p">,</span>
<span class="s2">&quot;description&quot;</span><span class="p">,</span>
<span class="s2">&quot;start_mega&quot;</span><span class="p">,</span>
<span class="s2">&quot;stop_mega&quot;</span><span class="p">,</span>
Expand Down
155 changes: 77 additions & 78 deletions docs/index.html

Large diffs are not rendered by default.

84 changes: 43 additions & 41 deletions docs/parser.html

Large diffs are not rendered by default.

149 changes: 76 additions & 73 deletions nbs/01_parser.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -113,54 +113,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Processamento do Arquivo `.bin` e criação dos diferentes tipos de blocos\n",
"A função seguinte `parse_bin` recebe um arquivo `.bin` e mapeia os blocos contidos nele retornando um dicionário:\n",
" * `file_version`: Versão do arquivo `.bin`\n",
" * `blocks`: Dicionário com os blocos do arquivo `.bin`. Cada tipo de bloco tem sua Classe Própria\n",
" \n",
" O dicionário `blocks` retornado tem como chave uma tupla (tipo de bloco, `thread_id`) e os valores como uma lista com os blocos ( classes ) extraídos sequencialmente.\n",
"\n",
"O tipo de bloco é a natureza do dado contido, por exemplo: 40 - GPS, 67 - Dado Espectral. O `thread_id` discrimina em geral diferentes \"faixas\" do mesmo tipo de dado. Para dados espectrais, por exemplo, diferentes thread_id representam varreduras de faixas de frequência distintas"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#export\n",
"def parse_bin(bin_file: Union[str, Path]) -> dict:\n",
" \"\"\"Receives a CRFS binfile and returns a dictionary with the file metadata, a GPS Class and a list with the different Spectrum Classes\n",
" A block is a piece of the .bin file with a known start and end and that contains different types of information.\n",
" It has several fields: file_type, header, data and footer.\n",
" Each field has lengths and information defined in the documentation.\n",
" Args:\n",
" bin_file (Union[str, Path]): path to the bin file\n",
"\n",
" Returns:\n",
" Dictionary with the file metadata, file_version, string info, gps and spectrum blocks.\n",
" \"\"\"\n",
" bin_file = Path(bin_file)\n",
" with open(bin_file, mode=\"rb\") as bfile:\n",
" # The first block of the file is the header and is 36 bytes long.\n",
" header = bfile.read(BYTES_HEADER)\n",
" body = bfile.read()\n",
" meta = classify_blocks(body.split(ENDMARKER))\n",
" parsed = {\n",
" \"filename\": bin_file.name,\n",
" \"file_version\": bin2int(header[:4]),\n",
" \"string\": bin2str(header[4:]),\n",
" }\n",
" parsed.update(meta)\n",
" return parsed"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A função a seguir recebe os bytes lidos do arquivo `.bin` e mapeia esses bytes em diferentes classes de acordo com o tipo de bloco"
"## Processamento do Arquivo `.bin` e criação dos diferentes tipos de blocos"
]
},
{
Expand Down Expand Up @@ -209,6 +162,13 @@
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A função a seguir recebe os bytes lidos do arquivo `.bin` e mapeia esses bytes em diferentes classes de acordo com o tipo de bloco"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -223,19 +183,60 @@
" \"\"\"\n",
" base_block = byte2base_block(byte_block)\n",
" if not base_block:\n",
" return None\n",
" return None, None\n",
" block_type = base_block.type\n",
" constructor = MAIN_BLOCKS.get(block_type)\n",
" if not constructor:\n",
" _ = logger.log(\n",
" \"INFO\", f\"This block type constructor is not implemented: {block_type}\"\n",
" )\n",
" return None\n",
" return None, None\n",
" block = constructor(base_block)\n",
" if getattr(block, \"gerror\", -1) != -1 or getattr(block, \"gps_status\", -1) == 0:\n",
" _ = logger.log(\"INFO\", f\"Block with error: {block_type}\")\n",
" return None # spectral or gps blocks with error\n",
" return block"
" return None, None # spectral or gps blocks with error\n",
" return getattrs(block, KEY_ATTRS.get(block.type)), block"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A função a seguir recebe os bytes lidos do arquivo `.bin` e mapeia esses bytes em diferentes classes de acordo com o tipo de bloco"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#export\n",
"def parse_bin(bin_file: Union[str, Path]) -> dict:\n",
" \"\"\"Receives a CRFS binfile and returns a dictionary with the file metadata, a GPS Class and a list with the different Spectrum Classes\n",
" A block is a piece of the .bin file with a known start and end and that contains different types of information.\n",
" It has several fields: file_type, header, data and footer.\n",
" Each field has lengths and information defined in the documentation.\n",
" Args:\n",
" bin_file (Union[str, Path]): path to the bin file\n",
"\n",
" Returns:\n",
" Dictionary with the file metadata, file_version, string info, gps and spectrum blocks.\n",
" \"\"\"\n",
" bin_file = Path(bin_file)\n",
" with open(bin_file, mode=\"rb\") as bfile:\n",
" # The first block of the file is the header and is 36 bytes long.\n",
" header = bfile.read(BYTES_HEADER)\n",
" body = bfile.read()\n",
" blocks = parallel(create_block, body.split(ENDMARKER), threadpool=True)\n",
" meta = classify_blocks(blocks)\n",
" parsed = {\n",
" \"filename\": bin_file.name,\n",
" \"file_version\": bin2int(header[:4]),\n",
" \"string\": bin2str(header[4:]),\n",
" }\n",
" parsed.update(meta)\n",
" return parsed"
]
},
{
Expand Down Expand Up @@ -310,12 +311,15 @@
" self._data = None\n",
" gc.collect()\n",
" return levels\n",
" \n",
" @cached\n",
" def frequencies(self)->np.ndarray:\n",
" return np.linspace(self.start_mega, self.stop_mega, num=self.ndata)\n",
"\n",
" def matrix(self):\n",
" \"\"\"Returns the matrix formed from the spectrum levels and timestamp\"\"\"\n",
" frequencies = np.linspace(self.start_mega, self.stop_mega, num=self.ndata)\n",
" index = self._timestamp if len(self._timestamp) == len(self) else None\n",
" data = pd.DataFrame(self.levels, index=index, columns=frequencies)\n",
" data = pd.DataFrame(self.levels, index=index, columns=self.frequencies)\n",
" data.columns.name = \"Frequencies\"\n",
" data.index.name = \"Time\"\n",
" return data"
Expand All @@ -328,22 +332,22 @@
"outputs": [],
"source": [
"#export\n",
"def check_block_exists(attrs, fluxos, block):\n",
"def check_block_exists(attrs, fluxos):\n",
" \"\"\"Receives a dict of attributes and check if its values exist as keys in fluxos, otherwise create one and set to CrfsSpectrum Class\"\"\"\n",
" values = tuple(attrs.values())\n",
" if values not in fluxos:\n",
" attributes = list(attrs.keys())\n",
" metavalues = list(values)\n",
" if hasattr(block, 'thresh'):\n",
" if 'thresh' not in attributes:\n",
" attributes.append('thresh')\n",
" metavalues.append(block.thresh)\n",
" if hasattr(block, 'minimum'):\n",
" if 'minimum' not in attributes:\n",
" attributes.append('minimum')\n",
" metavalues.append(block.minimum)\n",
" metadata = namedtuple(\"SpecData\", attributes)\n",
" fluxos[values] = CrfsSpectrum(metadata(*metavalues))\n",
" # attributes = list(attrs.keys())\n",
" # metavalues = list(values)\n",
" # if hasattr(block, 'thresh'):\n",
" # if 'thresh' not in attributes:\n",
" # attributes.append('thresh')\n",
" # metavalues.append(block.thresh)\n",
" # if hasattr(block, 'minimum'):\n",
" # if 'minimum' not in attributes:\n",
" # attributes.append('minimum')\n",
" # metavalues.append(block.minimum)\n",
" metadata = namedtuple(\"SpecData\", attrs.keys())\n",
" fluxos[values] = CrfsSpectrum(metadata(*attrs.values()))\n",
" return values, fluxos"
]
},
Expand All @@ -355,7 +359,7 @@
"source": [
"#export\n",
"def append_spec_data(attrs, fluxos, block)->None:\n",
" values, fluxos = check_block_exists(attrs, fluxos, block)\n",
" values, fluxos = check_block_exists(attrs, fluxos)\n",
" time = getattr(block, \"wallclock_datetime\", None)\n",
" data = getattr(block, 'levels', None)\n",
" if time is not None:\n",
Expand All @@ -368,16 +372,15 @@
" meta = {}\n",
" fluxos = {}\n",
" gps = CrfsGPS()\n",
" for byte_block in byte_blocks:\n",
" block = create_block(byte_block)\n",
" for attrs, block in byte_blocks:\n",
" if not block:\n",
" continue\n",
" if block.type == 40:\n",
" dtype = block.type\n",
" if dtype == 40:\n",
" for k in BLOCK_ATTRS.get(40, []):\n",
" getattr(gps, f\"_{k}\").append(getattr(block, k))\n",
" continue\n",
" attrs = getattrs(block, attrs=KEY_ATTRS.get(block.type))\n",
" if block.type in VECTOR_BLOCKS:\n",
" if dtype in VECTOR_BLOCKS:\n",
" append_spec_data(attrs, fluxos, block)\n",
" else:\n",
" meta.update(attrs)\n",
Expand Down
7 changes: 6 additions & 1 deletion nbs/04_constants.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@
" 7: (\n",
" \"type\",\n",
" \"thread_id\",\n",
" \"thresh\",\n",
" \"start_mega\",\n",
" \"stop_mega\",\n",
" \"namal\",\n",
Expand Down Expand Up @@ -275,6 +276,7 @@
" 61: (\n",
" \"type\",\n",
" \"thread_id\",\n",
" \"thresh\", \n",
" \"start_mega\",\n",
" \"stop_mega\",\n",
" \"ndata\",\n",
Expand Down Expand Up @@ -306,6 +308,7 @@
" 64: (\n",
" \"type\",\n",
" \"thread_id\",\n",
" \"thresh\",\n",
" \"description\",\n",
" \"start_mega\",\n",
" \"stop_mega\",\n",
Expand Down Expand Up @@ -340,6 +343,7 @@
" 68: (\n",
" \"type\",\n",
" \"thread_id\",\n",
" \"thresh\",\n",
" \"description\",\n",
" \"start_mega\",\n",
" \"stop_mega\",\n",
Expand Down Expand Up @@ -443,7 +447,8 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.7.10 64-bit ('rfpye': conda)",
"display_name": "Python 3",
"language": "python",
"name": "python3"
}
},
Expand Down
Loading

0 comments on commit be89490

Please sign in to comment.