Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a new Buffer class that can flexibly combine the functions of current RingBuffer, Read, and Write. #771

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
182 changes: 182 additions & 0 deletions src/lava/proc/io/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright (C) 2021-22 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import numpy as np

from typing import Optional, Union, Type

from lava.magma.core.process.variable import Var
from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.ports.ports import InPort, OutPort, RefPort

from lava.magma.core.resources import CPU
from lava.magma.core.decorator import implements, requires, tag
from lava.magma.core.model.py.model import PyLoihiProcessModel
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.model.py.ports import PyInPort, PyOutPort, PyRefPort


class Buffer(AbstractProcess):
"""Buffer receives data from OutPorts or VarPorts in each
timestep.

To add a connection, call `connect` with either an InPort
or a Var from another process.

To read the contents of the buffer, call `get` on the Var
returned by a call to `connect` while the process model
is still active (i.e. before calling `process.stop`).

TODO: Implement 'wrap_around', 'reallocate', 'read_write_file'
TODO: Support OutPorts also

Parameters
----------
length: int, default = 100
The number of timesteps that can be recorded in the buffer.
overflow: str, default = 'raise_exception'
The desired behavior when the buffer overflows. Options are
'raise_exception', 'wrap_around', and 'reallocate'.
"""
def __init__(self,
*,
length: int = 100,
overflow: str = 'raise_error') -> None:
super().__init__(length=length, overflow=overflow,
map_out=[], map_in=[], map_ref=[])
self.length = length
self.overflow = overflow
self.map_out = []
self.map_in = []
self.map_ref = []
self.index = 1000

def connect(self, other: Union[InPort, OutPort, Var],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be helpful to split this into a connect() and a connect_from() method, as we are doing it for ports. As it is now, I'm having trouble understanding the direction in which data flow will flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that would make sense. I'll make that change.

I don't love the names connect and connect_from but they're not awful and this would better differentiate the behavior of connecting to an InPort or Var (which will send the values from init) vs connecting from an OutPort or Var (in which case init will generally be overwritten).

init: Optional[np.ndarray] = None) -> Var:
"""Connect the buffer to an OutPort or Var from another process.
Calling this method will instantiate a new InPort or RefPort as
needed in the buffer and a corresponding Var of the appropriate
shape and length.
Parameters
----------
other: Union[OutPort, Var]
The other port or var to connect to and store in the buffer.
init: Optional[ndarray]
The initial value of the buffer Var. This will determine the
values sent from an InPort buffer and the default values for
an OutPort or RefPort buffer.
Returns
-------
The Var which will store the buffered data.
"""
index = self.index
var_shape = other.shape + (self.length,)
if init is None:
init = np.zeros(var_shape)
var = Var(shape=var_shape, init=init)
var.name = f'Var{index}'
setattr(self, var.name, var)

if isinstance(other, InPort):
port = OutPort(shape=other.shape)
port.name = f'Out{index}'
other.connect_from(port)
self.map_out.append((var.name, port.name))
self.proc_params.overwrite('map_out', self.map_out)
elif isinstance(other, OutPort):
port = InPort(shape=other.shape)
port.name = f'In{index}'
other.connect(port)
self.map_in.append((var.name, port.name))
self.proc_params.overwrite('map_in', self.map_in)
elif isinstance(other, Var):
port = RefPort(shape=other.shape)
port.name = f'Ref{index}'
port.connect_var(other)
self.map_ref.append((var.name, port.name))
self.proc_params.overwrite('map_ref', self.map_ref)
else:
raise ValueError(f'Other {other} is not an InPort, OutPort, '
'or Var.')
setattr(self, port.name, port)
self._post_init()
self.index += 1
return var


class MetaPyBuffer(type(PyLoihiProcessModel)):
"""This metaclass allows dynamic port and var generation."""
def __getattr__(cls, name):
if 'In' in name:
return LavaPyType(PyInPort.VEC_DENSE, float)
elif 'Out' in name:
return LavaPyType(PyOutPort.VEC_DENSE, float)
elif 'Ref' in name:
return LavaPyType(PyRefPort.VEC_DENSE, float)
elif 'Var' in name:
return LavaPyType(np.ndarray, float)
else:
raise AttributeError(name=name, obj=cls)


@implements(proc=Buffer, protocol=LoihiProtocol)
@requires(CPU)
class PyBuffer(PyLoihiProcessModel, metaclass=MetaPyBuffer):
"""Python CPU model for Buffer. Uses dense floating point numpy
arrays for buffer storage and operations."""
def __init__(self, proc_params):
super().__init__(proc_params)
self.length = proc_params['length']
self.overflow = proc_params['overflow']
self.map_in = proc_params['map_in']
self.map_out = proc_params['map_out']
self.map_ref = proc_params['map_ref']

for var, port in self.map_in:
setattr(self, var, LavaPyType(np.ndarray, float))
setattr(self, port, LavaPyType(PyInPort.VEC_DENSE, float))

for var, port in self.map_out:
setattr(self, var, LavaPyType(np.ndarray, float))
setattr(self, port, LavaPyType(PyOutPort.VEC_DENSE, float))

for var, port in self.map_ref:
setattr(self, var, LavaPyType(np.ndarray, float))
setattr(self, port, LavaPyType(PyRefPort.VEC_DENSE, float))

def run_spk(self) -> None:
"""Read InPorts and write to buffer Vars and read from buffer
Vars to write to OutPorts."""
t = self.time_step - 1
if t >= self.length:
self.do_overflow()
for var, port in self.map_in:
data = getattr(self, port).recv()
getattr(self, var)[..., t] = data
for var, port in self.map_out:
data = getattr(self, var)[..., t]
getattr(self, port).send(data)

def post_guard(self) -> None:
"""Do management phase only if needed for RefPort reads."""
return len(self.map_ref) > 0

def run_post_mgmt(self) -> None:
"""Read RefPorts and write to buffer Vars."""
t = self.time_step - 1
if t >= self.length:
self.do_overflow()
for var, port in self.map_ref:
data = getattr(self, port).read()
getattr(self, var)[..., t] = data

def do_overflow(self) -> None:
"""Implement overflow behavior."""
if self.overflow == 'raise_error':
raise RuntimeError(f'PyBuffer overflow: timestep {self.time_step}'
' is greater than length {self.length}')
else:
raise NotImplementedError(f'PyBuffer overflow: overflow '
'{self.overflow} is not implemented.')
118 changes: 118 additions & 0 deletions tests/lava/proc/io/test_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright (C) 2021-22 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

from typing import List
import unittest
import numpy as np
from lava.magma.core.process.ports.ports import OutPort

from lava.magma.core.process.variable import Var
from lava.magma.core.run_configs import Loihi2SimCfg
from lava.magma.core.run_conditions import RunSteps
from lava.proc.io.injector import Injector
from lava.proc.io.extractor import Extractor
from lava.proc.lif.process import LIF
from lava.proc.io.buffer import Buffer


class TestBuffer(unittest.TestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with how we test other Processes and ProcessModels in Lava, we might want to split tests into TestProcess and TestProcessModel. Honestly, I don't like that divide myself, and I prefer it the way you did it here...
But I think it is crucial for code quality to be consistent in how we test these things.
So if we decide to go this route and drop the divide between TestProcess and TestProcessModel in the future, we should at least write an issue about the need to refactor other Process tests.

def test_create_buffer(self):
buffer = Buffer()
self.assertEqual(buffer.length, 100)
self.assertEqual(buffer.overflow, 'raise_error')

def test_connect_buffer(self):
buffer = Buffer()
# Test connecting to a Var
var_shape = (10,)
buffer_shape = (10, 100)
test_var = Var(shape=var_shape)
self.assertFalse(hasattr(buffer, 'Var1000'))
self.assertFalse(hasattr(buffer, 'Ref1000'))
buffer.connect(test_var)
self.assertTrue(hasattr(buffer, 'Var1000'))
self.assertTrue(hasattr(buffer, 'Ref1000'))
self.assertIn(buffer.Var1000, buffer.vars)
self.assertIn(buffer.Ref1000, buffer.ref_ports)
self.assertEqual(buffer.Ref1000.shape, var_shape)
self.assertEqual(buffer.Var1000.shape, buffer_shape)
# Test connecting to an OutPort
test_out_port = OutPort(shape=var_shape)
v = buffer.connect(test_out_port)
self.assertEqual(v.shape, buffer_shape)
self.assertTrue(hasattr(buffer, v.name))
self.assertEqual(len(list(buffer.in_ports)), 1)

def test_run_buffer(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with other test names, this should be something like test_run_single_input_buffer.

num_steps = 10
injector = Injector(shape=(1,))
buffer = Buffer(length=10)
v0 = buffer.connect(injector.out_port)
buffer.create_runtime(run_cfg=Loihi2SimCfg())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to explicitly call buffer.create_runtime before running ?
If so, please add a mention of this in the docstring of the Buffer class otherwise it might not be very straightforward for users to find out about it.

buffer.run(condition=RunSteps(num_steps, blocking=False))
for t in range(num_steps):
injector.send(np.full((1,), t))
buffer.wait()
data = v0.get().flatten().astype(int).tolist()
buffer.stop()
self.assertSequenceEqual(data, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

def test_buffer_with_2vars(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with other test names, this should be something like test_run_single_input_buffer_2_vars.

"""Test to ensure that a Buffer can connect to multiple Vars."""
num_steps = 10
injector = Injector(shape=(1,))
lif = LIF(shape=(1,), du=1)
buffer = Buffer(length=10)
injector.out_port.connect(lif.a_in)
u = buffer.connect(lif.u)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this creates a Var u but does not place it inside the buffer object? I was somehow not expecting that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see how this is slightly confusing, because the use of dynamic port and var creation doesn't allow you to immediately see what's happening in PyBuffer, but the port and vars are added to the Buffer process and the corresponding PyPort and PyVars are added to the PyBuffer process model.

They can be accessed in the buffer.in_ports, buffer.out_ports, buffer.ref_ports, and buffer.vars collections, or they can be accessed directly with the use of a name as an attribute of the buffer. Unfortunately, I hit a small snag where I couldn't quite find a way to achieve user-specified names for those objects while also making it easy to intercept calls to the metaclass.getattr (necessary to convince the compiler to allow the process model to be built, but that's sort of a hack that could later be fixed in the compiler). Thus, the name of the var that will be used as a buffer is currently VarXXXX with the x's pulled from a incrementing sequence starting at 1000.

So, you could do:

buffer = Buffer()
u = buffer.connect(some_other_proc.some_port)
assert(u == buffer.Var1000)

I'll think a bit more on whether there's a way to let the user specify the name of the var, as I do find that more compelling than Var1000.

v = buffer.connect(lif.v)
buffer.create_runtime(run_cfg=Loihi2SimCfg())
buffer.run(condition=RunSteps(num_steps, blocking=False))
for t in range(num_steps):
injector.send(np.full((1,), t))
buffer.wait()
udata = u.get().flatten().astype(int).tolist()
vdata = v.get().flatten().astype(int).tolist()
buffer.stop()
self.assertSequenceEqual(udata, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
self.assertSequenceEqual(vdata, [0, 1, 3, 6, 10, 0, 6, 0, 8, 0])

def test_multiple_buffers(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with other test names, this should be something like test_run_multiple_input_buffer.

"""Test to ensure that two Buffers in the same process graph
do not interfere with one another due to dynamic Vars/Ports."""
num_steps = 10
injector = Injector(shape=(1,))
lif = LIF(shape=(1,), du=1)
ubuffer = Buffer(length=10)
vbuffer = Buffer(length=10)
injector.out_port.connect(lif.a_in)
u = ubuffer.connect(lif.u)
v = vbuffer.connect(lif.v)
ubuffer.create_runtime(run_cfg=Loihi2SimCfg())
ubuffer.run(condition=RunSteps(num_steps, blocking=False))
for t in range(num_steps):
injector.send(np.full((1,), t))
ubuffer.wait()
udata = u.get().flatten().astype(int).tolist()
vdata = v.get().flatten().astype(int).tolist()
ubuffer.stop()
self.assertSequenceEqual(udata, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
self.assertSequenceEqual(vdata, [0, 1, 3, 6, 10, 0, 6, 0, 8, 0])

def test_output_buffer(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with other test names, this should be something like test_run_single_output_buffer.
Oh, and don't forget the docstring for this test.

num_steps = 10
extractor = Extractor(shape=(1,))
buffer = Buffer(length=10)
vdata = np.array(range(10)).reshape((1, 10))
v = buffer.connect(extractor.in_port, init=vdata)
buffer.create_runtime(run_cfg=Loihi2SimCfg())
buffer.run(condition=RunSteps(num_steps, blocking=False))
for t in range(num_steps):
self.assertEqual(extractor.receive(), vdata[0,t])
buffer.wait()
buffer.stop()


if __name__ == '__main__':
unittest.main()