-
Notifications
You must be signed in to change notification settings - Fork 144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a new Buffer class that can flexibly combine the functions of current RingBuffer, Read, and Write. #771
base: main
Are you sure you want to change the base?
Changes from 1 commit
050a68d
8d04149
f4ea24a
286f13b
5222288
ff1c1ec
30033f4
0a3f28d
bb893a9
7484c93
815a1de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
# Copyright (C) 2021-22 Intel Corporation | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# See: https://spdx.org/licenses/ | ||
|
||
import numpy as np | ||
|
||
from typing import Optional, Union, Type | ||
|
||
from lava.magma.core.process.variable import Var | ||
from lava.magma.core.process.process import AbstractProcess | ||
from lava.magma.core.process.ports.ports import InPort, OutPort, RefPort | ||
|
||
from lava.magma.core.resources import CPU | ||
from lava.magma.core.decorator import implements, requires, tag | ||
from lava.magma.core.model.py.model import PyLoihiProcessModel | ||
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol | ||
from lava.magma.core.model.py.type import LavaPyType | ||
from lava.magma.core.model.py.ports import PyInPort, PyOutPort, PyRefPort | ||
|
||
|
||
class Buffer(AbstractProcess): | ||
"""Buffer receives data from OutPorts or VarPorts in each | ||
timestep. | ||
|
||
To add a connection, call `connect` with either an InPort | ||
or a Var from another process. | ||
|
||
To read the contents of the buffer, call `get` on the Var | ||
returned by a call to `connect` while the process model | ||
is still active (i.e. before calling `process.stop`). | ||
|
||
TODO: Implement 'wrap_around', 'reallocate', 'read_write_file' | ||
TODO: Support OutPorts also | ||
|
||
Parameters | ||
---------- | ||
length: int, default = 100 | ||
The number of timesteps that can be recorded in the buffer. | ||
overflow: str, default = 'raise_exception' | ||
The desired behavior when the buffer overflows. Options are | ||
'raise_exception', 'wrap_around', and 'reallocate'. | ||
""" | ||
def __init__(self, | ||
*, | ||
length: int = 100, | ||
overflow: str = 'raise_error') -> None: | ||
super().__init__(length=length, overflow=overflow, | ||
map_out=[], map_in=[], map_ref=[]) | ||
self.length = length | ||
self.overflow = overflow | ||
self.map_out = [] | ||
self.map_in = [] | ||
self.map_ref = [] | ||
self.index = 1000 | ||
|
||
def connect(self, other: Union[InPort, OutPort, Var], | ||
init: Optional[np.ndarray] = None) -> Var: | ||
"""Connect the buffer to an OutPort or Var from another process. | ||
Calling this method will instantiate a new InPort or RefPort as | ||
needed in the buffer and a corresponding Var of the appropriate | ||
shape and length. | ||
Parameters | ||
---------- | ||
other: Union[OutPort, Var] | ||
The other port or var to connect to and store in the buffer. | ||
init: Optional[ndarray] | ||
The initial value of the buffer Var. This will determine the | ||
values sent from an InPort buffer and the default values for | ||
an OutPort or RefPort buffer. | ||
Returns | ||
------- | ||
The Var which will store the buffered data. | ||
""" | ||
index = self.index | ||
var_shape = other.shape + (self.length,) | ||
if init is None: | ||
init = np.zeros(var_shape) | ||
var = Var(shape=var_shape, init=init) | ||
var.name = f'Var{index}' | ||
setattr(self, var.name, var) | ||
|
||
if isinstance(other, InPort): | ||
port = OutPort(shape=other.shape) | ||
port.name = f'Out{index}' | ||
other.connect_from(port) | ||
self.map_out.append((var.name, port.name)) | ||
self.proc_params.overwrite('map_out', self.map_out) | ||
elif isinstance(other, OutPort): | ||
port = InPort(shape=other.shape) | ||
port.name = f'In{index}' | ||
other.connect(port) | ||
self.map_in.append((var.name, port.name)) | ||
self.proc_params.overwrite('map_in', self.map_in) | ||
elif isinstance(other, Var): | ||
port = RefPort(shape=other.shape) | ||
port.name = f'Ref{index}' | ||
port.connect_var(other) | ||
self.map_ref.append((var.name, port.name)) | ||
self.proc_params.overwrite('map_ref', self.map_ref) | ||
else: | ||
raise ValueError(f'Other {other} is not an InPort, OutPort, ' | ||
'or Var.') | ||
setattr(self, port.name, port) | ||
self._post_init() | ||
self.index += 1 | ||
return var | ||
|
||
|
||
class MetaPyBuffer(type(PyLoihiProcessModel)): | ||
"""This metaclass allows dynamic port and var generation.""" | ||
def __getattr__(cls, name): | ||
if 'In' in name: | ||
return LavaPyType(PyInPort.VEC_DENSE, float) | ||
elif 'Out' in name: | ||
return LavaPyType(PyOutPort.VEC_DENSE, float) | ||
elif 'Ref' in name: | ||
return LavaPyType(PyRefPort.VEC_DENSE, float) | ||
elif 'Var' in name: | ||
return LavaPyType(np.ndarray, float) | ||
else: | ||
raise AttributeError(name=name, obj=cls) | ||
|
||
|
||
@implements(proc=Buffer, protocol=LoihiProtocol) | ||
@requires(CPU) | ||
class PyBuffer(PyLoihiProcessModel, metaclass=MetaPyBuffer): | ||
"""Python CPU model for Buffer. Uses dense floating point numpy | ||
arrays for buffer storage and operations.""" | ||
def __init__(self, proc_params): | ||
super().__init__(proc_params) | ||
self.length = proc_params['length'] | ||
self.overflow = proc_params['overflow'] | ||
self.map_in = proc_params['map_in'] | ||
self.map_out = proc_params['map_out'] | ||
self.map_ref = proc_params['map_ref'] | ||
|
||
for var, port in self.map_in: | ||
setattr(self, var, LavaPyType(np.ndarray, float)) | ||
setattr(self, port, LavaPyType(PyInPort.VEC_DENSE, float)) | ||
|
||
for var, port in self.map_out: | ||
setattr(self, var, LavaPyType(np.ndarray, float)) | ||
setattr(self, port, LavaPyType(PyOutPort.VEC_DENSE, float)) | ||
|
||
for var, port in self.map_ref: | ||
setattr(self, var, LavaPyType(np.ndarray, float)) | ||
setattr(self, port, LavaPyType(PyRefPort.VEC_DENSE, float)) | ||
|
||
def run_spk(self) -> None: | ||
"""Read InPorts and write to buffer Vars and read from buffer | ||
Vars to write to OutPorts.""" | ||
t = self.time_step - 1 | ||
if t >= self.length: | ||
self.do_overflow() | ||
for var, port in self.map_in: | ||
data = getattr(self, port).recv() | ||
getattr(self, var)[..., t] = data | ||
for var, port in self.map_out: | ||
data = getattr(self, var)[..., t] | ||
getattr(self, port).send(data) | ||
|
||
def post_guard(self) -> None: | ||
"""Do management phase only if needed for RefPort reads.""" | ||
return len(self.map_ref) > 0 | ||
|
||
def run_post_mgmt(self) -> None: | ||
"""Read RefPorts and write to buffer Vars.""" | ||
t = self.time_step - 1 | ||
if t >= self.length: | ||
self.do_overflow() | ||
for var, port in self.map_ref: | ||
data = getattr(self, port).read() | ||
getattr(self, var)[..., t] = data | ||
|
||
def do_overflow(self) -> None: | ||
"""Implement overflow behavior.""" | ||
if self.overflow == 'raise_error': | ||
raise RuntimeError(f'PyBuffer overflow: timestep {self.time_step}' | ||
' is greater than length {self.length}') | ||
else: | ||
raise NotImplementedError(f'PyBuffer overflow: overflow ' | ||
'{self.overflow} is not implemented.') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# Copyright (C) 2021-22 Intel Corporation | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# See: https://spdx.org/licenses/ | ||
|
||
from typing import List | ||
import unittest | ||
import numpy as np | ||
from lava.magma.core.process.ports.ports import OutPort | ||
|
||
from lava.magma.core.process.variable import Var | ||
from lava.magma.core.run_configs import Loihi2SimCfg | ||
from lava.magma.core.run_conditions import RunSteps | ||
from lava.proc.io.injector import Injector | ||
from lava.proc.io.extractor import Extractor | ||
from lava.proc.lif.process import LIF | ||
from lava.proc.io.buffer import Buffer | ||
|
||
|
||
class TestBuffer(unittest.TestCase): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with how we test other Processes and ProcessModels in Lava, we might want to split tests into TestProcess and TestProcessModel. Honestly, I don't like that divide myself, and I prefer it the way you did it here... |
||
def test_create_buffer(self): | ||
buffer = Buffer() | ||
self.assertEqual(buffer.length, 100) | ||
self.assertEqual(buffer.overflow, 'raise_error') | ||
|
||
def test_connect_buffer(self): | ||
buffer = Buffer() | ||
# Test connecting to a Var | ||
var_shape = (10,) | ||
buffer_shape = (10, 100) | ||
test_var = Var(shape=var_shape) | ||
self.assertFalse(hasattr(buffer, 'Var1000')) | ||
self.assertFalse(hasattr(buffer, 'Ref1000')) | ||
buffer.connect(test_var) | ||
self.assertTrue(hasattr(buffer, 'Var1000')) | ||
self.assertTrue(hasattr(buffer, 'Ref1000')) | ||
self.assertIn(buffer.Var1000, buffer.vars) | ||
self.assertIn(buffer.Ref1000, buffer.ref_ports) | ||
self.assertEqual(buffer.Ref1000.shape, var_shape) | ||
self.assertEqual(buffer.Var1000.shape, buffer_shape) | ||
# Test connecting to an OutPort | ||
test_out_port = OutPort(shape=var_shape) | ||
v = buffer.connect(test_out_port) | ||
self.assertEqual(v.shape, buffer_shape) | ||
self.assertTrue(hasattr(buffer, v.name)) | ||
self.assertEqual(len(list(buffer.in_ports)), 1) | ||
|
||
def test_run_buffer(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with other test names, this should be something like |
||
num_steps = 10 | ||
injector = Injector(shape=(1,)) | ||
buffer = Buffer(length=10) | ||
v0 = buffer.connect(injector.out_port) | ||
buffer.create_runtime(run_cfg=Loihi2SimCfg()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to explicitly call |
||
buffer.run(condition=RunSteps(num_steps, blocking=False)) | ||
for t in range(num_steps): | ||
injector.send(np.full((1,), t)) | ||
buffer.wait() | ||
data = v0.get().flatten().astype(int).tolist() | ||
buffer.stop() | ||
self.assertSequenceEqual(data, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) | ||
|
||
def test_buffer_with_2vars(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with other test names, this should be something like |
||
"""Test to ensure that a Buffer can connect to multiple Vars.""" | ||
num_steps = 10 | ||
injector = Injector(shape=(1,)) | ||
lif = LIF(shape=(1,), du=1) | ||
buffer = Buffer(length=10) | ||
injector.out_port.connect(lif.a_in) | ||
u = buffer.connect(lif.u) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this creates a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see how this is slightly confusing, because the use of dynamic port and var creation doesn't allow you to immediately see what's happening in PyBuffer, but the port and vars are added to the Buffer process and the corresponding PyPort and PyVars are added to the PyBuffer process model. They can be accessed in the buffer.in_ports, buffer.out_ports, buffer.ref_ports, and buffer.vars collections, or they can be accessed directly with the use of a name as an attribute of the buffer. Unfortunately, I hit a small snag where I couldn't quite find a way to achieve user-specified names for those objects while also making it easy to intercept calls to the metaclass.getattr (necessary to convince the compiler to allow the process model to be built, but that's sort of a hack that could later be fixed in the compiler). Thus, the name of the var that will be used as a buffer is currently So, you could do:
I'll think a bit more on whether there's a way to let the user specify the name of the var, as I do find that more compelling than |
||
v = buffer.connect(lif.v) | ||
buffer.create_runtime(run_cfg=Loihi2SimCfg()) | ||
buffer.run(condition=RunSteps(num_steps, blocking=False)) | ||
for t in range(num_steps): | ||
injector.send(np.full((1,), t)) | ||
buffer.wait() | ||
udata = u.get().flatten().astype(int).tolist() | ||
vdata = v.get().flatten().astype(int).tolist() | ||
buffer.stop() | ||
self.assertSequenceEqual(udata, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) | ||
self.assertSequenceEqual(vdata, [0, 1, 3, 6, 10, 0, 6, 0, 8, 0]) | ||
|
||
def test_multiple_buffers(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with other test names, this should be something like |
||
"""Test to ensure that two Buffers in the same process graph | ||
do not interfere with one another due to dynamic Vars/Ports.""" | ||
num_steps = 10 | ||
injector = Injector(shape=(1,)) | ||
lif = LIF(shape=(1,), du=1) | ||
ubuffer = Buffer(length=10) | ||
vbuffer = Buffer(length=10) | ||
injector.out_port.connect(lif.a_in) | ||
u = ubuffer.connect(lif.u) | ||
v = vbuffer.connect(lif.v) | ||
ubuffer.create_runtime(run_cfg=Loihi2SimCfg()) | ||
ubuffer.run(condition=RunSteps(num_steps, blocking=False)) | ||
for t in range(num_steps): | ||
injector.send(np.full((1,), t)) | ||
ubuffer.wait() | ||
udata = u.get().flatten().astype(int).tolist() | ||
vdata = v.get().flatten().astype(int).tolist() | ||
ubuffer.stop() | ||
self.assertSequenceEqual(udata, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) | ||
self.assertSequenceEqual(vdata, [0, 1, 3, 6, 10, 0, 6, 0, 8, 0]) | ||
|
||
def test_output_buffer(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with other test names, this should be something like |
||
num_steps = 10 | ||
extractor = Extractor(shape=(1,)) | ||
buffer = Buffer(length=10) | ||
vdata = np.array(range(10)).reshape((1, 10)) | ||
v = buffer.connect(extractor.in_port, init=vdata) | ||
buffer.create_runtime(run_cfg=Loihi2SimCfg()) | ||
buffer.run(condition=RunSteps(num_steps, blocking=False)) | ||
for t in range(num_steps): | ||
self.assertEqual(extractor.receive(), vdata[0,t]) | ||
buffer.wait() | ||
buffer.stop() | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be helpful to split this into a
connect()
and aconnect_from()
method, as we are doing it for ports. As it is now, I'm having trouble understanding the direction in which data flow will flow.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that would make sense. I'll make that change.
I don't love the names
connect
andconnect_from
but they're not awful and this would better differentiate the behavior of connecting to an InPort or Var (which will send the values from init) vs connecting from an OutPort or Var (in which case init will generally be overwritten).