Skip to content

Commit

Permalink
Merge pull request #1189 from SpiNNakerManchester/rd2
Browse files Browse the repository at this point in the history
recording data fixes
  • Loading branch information
rowleya authored Sep 11, 2024
2 parents 0143c79 + 865b9fb commit acf3713
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,6 @@ void readMemory(BufferManagerStorage.Region region,
var buffer = new byte[region.size];
readMemory(region.core.asChipLocation(), region.startAddress,
region.size, new BufferAccumulator(buffer));
storage.appendRecordingContents(region, buffer);
storage.extractRecordingContents(region, buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,21 @@ public RecordingRegion getRecordingRegion(RegionLocation location) {
* The X, Y, P and Region
* @param data
* data to be stored
* @param isRecording
* If this is a recording region
* @throws StorageException
* If there is a problem storing the data.
*/
public void storeDataInRegionBuffer(RegionLocation location,
ByteBuffer data) throws StorageException {
ByteBuffer data, boolean isRecording) throws StorageException {
if (log.isInfoEnabled()) {
log.info("retrieved {} bytes from region {} of {}",
data.remaining(), location.region,
location.asCoreLocation());
}
storage.appendRecordingContents(
new Region(location, location.region, NULL, 0), data);
storage.extractRecordingContents(
new Region(location, location.region, NULL, 0, isRecording),
data);
}

/**
Expand All @@ -158,12 +161,15 @@ public void storeDataInRegionBuffer(RegionLocation location,
* The X, Y, P and Region
* @param data
* data to be stored
* @param isRecording
* if this is a recording region
* @throws StorageException
* If there is a problem storing the data.
*/
public void flushingDataFromRegion(RegionLocation location, ByteBuffer data)
public void flushingDataFromRegion(
RegionLocation location, ByteBuffer data, boolean isRecording)
throws StorageException {
storeDataInRegionBuffer(location, data);
storeDataInRegionBuffer(location, data, isRecording);
isFlushed.put(location, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private Map<ChipLocation, List<WorkItems>> discoverActualWork(
for (var p : m.getPlacements()) {
var regions = new ArrayList<Region>();
for (int id : p.getVertex().getRecordedRegionIds()) {
var r = getRegion(p, id);
var r = getRecordingRegion(p, id);
if (r.size > 0) {
regions.add(r);
count += 1;
Expand All @@ -269,7 +269,7 @@ private Map<ChipLocation, List<WorkItems>> discoverActualWork(
}
for (var dr : p.getVertex().getDownloadRegions()) {
regions.add(new Region(p, dr.getIndex(),
dr.getAddress(), dr.getSize()));
dr.getAddress(), dr.getSize(), false));
count++;
}
workitems.add(new WorkItems(m, regions));
Expand Down Expand Up @@ -486,15 +486,15 @@ private static List<String> describeChunk(Chunk<Byte> chunk) {
*/
@UsedInJavadocOnly(BufferManagerStorage.class)
@ForOverride
protected abstract Region getRegion(Placement placement, int regionID)
protected abstract Region getRecordingRegion(
Placement placement, int regionID)
throws IOException, ProcessException, StorageException,
InterruptedException;

/**
* Store the data retrieved from a region. Called (at most) once for each
* element in the list returned by {@link #getRegion(Placement,int)
* getRegion(...)}, <i>in order</i>. No guarantee is made about which thread
* will call this method.
* Store the data retrieved from a region
*
* No guarantee is made about which thread will call this method.
*
* @param r
* Where the data came from.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void getDataForPlacements(List<Placement> placements)
var location = new RegionLocation(placement,
region.getIndex());
readSomeData(location, region.getAddress(),
region.getSize());
region.getSize(), false);
}
}
}
Expand All @@ -172,7 +172,7 @@ private void getDataForPlacement(Placement placement, int recordingRegionId)

// Read the data
var region = receivedData.getRecordingRegion(location);
readSomeData(location, region.data, region.size);
readSomeData(location, region.data, region.size, true);
}

private static final long MAX_UINT = 0xFFFFFFFFL;
Expand All @@ -182,8 +182,8 @@ private static boolean is32bit(long value) {
}

private void readSomeData(RegionLocation location, MemoryLocation address,
long length) throws IOException, StorageException, ProcessException,
InterruptedException {
long length, boolean isRecording) throws IOException,
StorageException, ProcessException, InterruptedException {
if (!is32bit(length)) {
throw new IllegalArgumentException("non-32-bit argument");
}
Expand All @@ -192,7 +192,7 @@ private void readSomeData(RegionLocation location, MemoryLocation address,
address);
}
var data = requestData(location, address, (int) length);
receivedData.flushingDataFromRegion(location, data);
receivedData.flushingDataFromRegion(location, data, isRecording);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,15 @@ private IntBuffer getCoreRegionTable(CoreLocation core, Vertex vertex)
}

@Override
protected Region getRegion(Placement placement, int regionID)
protected Region getRecordingRegion(Placement placement, int regionID)
throws IOException, ProcessException, InterruptedException {
var b = getCoreRegionTable(placement.asCoreLocation(),
placement.getVertex());
// TODO This is wrong because of shared regions!
int size = b.get(regionID + 1) - b.get(regionID);
return new Region(
placement, regionID, new MemoryLocation(b.get(regionID)), size);
placement, regionID, new MemoryLocation(b.get(regionID)), size,
true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ private List<RecordingRegion> getRegions(Placement placement)
}

@Override
protected Region getRegion(Placement placement, int index)
protected Region getRecordingRegion(
Placement placement, int index)
throws IOException, ProcessException, InterruptedException {
var region = getRegions(placement).get(index);
log.debug("got region of {} R:{} as {}", placement.asCoreLocation(),
index, region);
return new Region(placement, index, region.data, (int) region.size);
return new Region(placement, index, region.data, (int) region.size,
true);
}

@Override
Expand All @@ -132,7 +134,7 @@ protected void storeData(Region r, ByteBuffer data) {
log.info("storing region data for {} R:{} from {} as {} bytes",
r.core, r.regionIndex, r.startAddress, data.remaining());
try {
database.appendRecordingContents(r, data);
database.extractRecordingContents(r, data);
numWrites++;
} catch (StorageException e) {
log.error("failed to write to database", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ class Region {
@PositiveOrZero
public final int finalIgnore;

/**
* States is a recording region
* True if this region is appended after every run/loop.
* False is this region is overwritten aevery run.
*/
@NotNull
public final boolean isRecording;

/**
* Create a region descriptor.
*
Expand All @@ -180,9 +188,14 @@ class Region {
* @param size
* How much data should be downloaded? <em>This is not
* necessarily the size of the region.</em>
* @param isRecording
* States is a recording region
* True if this region is appended after every run/loop.
* False is this region is overwritten aevery run.
*
*/
public Region(HasCoreLocation core, int regionIndex,
MemoryLocation startLocation, int size) {
MemoryLocation startLocation, int size, boolean isRecording) {
this.core = core.asCoreLocation();
this.regionIndex = regionIndex;
realSize = size;
Expand All @@ -193,6 +206,7 @@ public Region(HasCoreLocation core, int regionIndex,
finalIgnore = (INT_SIZE - (size % INT_SIZE)) % INT_SIZE;
size += finalIgnore;
this.size = size;
this.isRecording = isRecording;
}

/**
Expand Down Expand Up @@ -257,7 +271,7 @@ default int storeRegionContents(Region region, ByteBuffer contents)
}

/**
* Adds some bytes to the database. The bytes represent part of the contents
* Extract some bytes to the database. The bytes represent the contents
* of a recording region of a particular SpiNNaker core.
*
* @param region
Expand All @@ -267,26 +281,26 @@ default int storeRegionContents(Region region, ByteBuffer contents)
* @throws StorageException
* If anything goes wrong.
*/
void appendRecordingContents(Region region, byte[] contents)
void extractRecordingContents(Region region, byte[] contents)
throws StorageException;

void insertMockExtraction() throws StorageException;

/**
* Adds some bytes to the database. The bytes represent part of the contents
* Extract some bytes to the database. The bytes represent the contents
* of a recording region of a particular SpiNNaker core.
*
* @param region
* The recording region that is being recorded.
* @param contents
* The contents to append.
* The contents to extract.
* @throws StorageException
* If anything goes wrong.
*/
default void appendRecordingContents(Region region, ByteBuffer contents)
default void extractRecordingContents(Region region, ByteBuffer contents)
throws StorageException {
var ary = new byte[contents.remaining()];
contents.slice().get(ary);
appendRecordingContents(region, ary);
extractRecordingContents(region, ary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ private SQL() {
/** Create a region record. */
@Parameter("core_id")
@Parameter("local_region_index")
@Parameter("is_recording")
@ResultColumn("region_id")
static final String INSERT_REGION = "INSERT INTO "
+ "region(core_id, local_region_index)"
+ " VALUES (?, ?) RETURNING region_id";
+ "region(core_id, local_region_index, is_recording)"
+ " VALUES (?, ?, ?) RETURNING region_id";


/** For testing create an extraction record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private static int getRecordingRegion(Connection conn, int coreID,
}
try (var s = conn.prepareStatement(INSERT_REGION)) {
// core_id, local_region_index, address
setArguments(s, coreID, region.regionIndex);
setArguments(s, coreID, region.regionIndex, region.isRecording);
try (var rs = s.executeQuery()) {
while (rs.next()) {
return rs.getInt("region_id");
Expand All @@ -128,6 +128,21 @@ private static int getRecordingRegion(Connection conn, int coreID,
"could not make or find recording region record");
}

private static int getExistingRecordingRegion(
Connection conn, int coreID, Region region) throws SQLException {
try (var s = conn.prepareStatement(GET_REGION)) {
// core_id, local_region_index
setArguments(s, coreID, region.regionIndex);
try (var rs = s.executeQuery()) {
while (rs.next()) {
return rs.getInt("region_id");
}
}
}
throw new IllegalStateException(
"could not find recording region record");
}

private static int insertMockExtraction(Connection conn)
throws SQLException {
try (var s = conn.prepareStatement(INSERT_MOCK_EXTRACTION)) {
Expand All @@ -147,7 +162,7 @@ public void insertMockExtraction() throws StorageException {
"Mocking Extraction");
}

private void appendRecordingContents(Connection conn, int regionID,
private void extractRecordingContents(Connection conn, int regionID,
int lastExtractionId, byte[] content) throws SQLException {
int chunkLen = content.length;
var chunk = new ByteArrayInputStream(content);
Expand Down Expand Up @@ -191,7 +206,7 @@ private int addRegionData(Connection conn, int regionID, int extractionId,
}

@Override
public void appendRecordingContents(Region region, byte[] contents)
public void extractRecordingContents(Region region, byte[] contents)
throws StorageException {
// Strip off any prefix and suffix added to make the read aligned
byte[] tmp;
Expand All @@ -201,7 +216,7 @@ public void appendRecordingContents(Region region, byte[] contents)
tmp = new byte[region.realSize];
arraycopy(contents, region.initialIgnore, tmp, 0, region.realSize);
}
callV(conn -> appendRecordContents(conn, region, tmp),
callV(conn -> extractRecordContents(conn, region, tmp),
"creating or adding to a recorded region");
}

Expand All @@ -216,14 +231,14 @@ public void appendRecordingContents(Region region, byte[] contents)
* The bytes to append.
* @throws SQLException
* If anything goes wrong.
* @see #appendRecordingContents(Region,int,byte[])
* @see #extractRecordingContents(Region,int,byte[])
*/
private void appendRecordContents(Connection conn, Region region,
private void extractRecordContents(Connection conn, Region region,
byte[] contents) throws SQLException {
int coreID = getRecordingCore(conn, region.core);
int regionID = getRecordingRegion(conn, coreID, region);
int lastExtractionId = getLastExtractionId(conn);
appendRecordingContents(conn, regionID, lastExtractionId, contents);
extractRecordingContents(conn, regionID, lastExtractionId, contents);
}

@Override
Expand All @@ -238,7 +253,7 @@ private static byte[] getRecordingRegionContents(Connection conn,
var accum = new ByteArrayOutputStream();
try {
int coreID = getRecordingCore(conn, region.core);
int regionID = getRecordingRegion(conn, coreID, region);
int regionID = getExistingRecordingRegion(conn, coreID, region);
try (var s = conn.prepareStatement(FETCH_RECORDING)) {
// region_id
setArguments(s, regionID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,13 @@ CREATE TABLE IF NOT EXISTS region(
core_id INTEGER NOT NULL
REFERENCES core(core_id) ON DELETE RESTRICT,
local_region_index INTEGER NOT NULL,
address INTEGER,
content BLOB NOT NULL DEFAULT '',
content_len INTEGER DEFAULT 0,
fetches INTEGER NOT NULL DEFAULT 0,
append_time INTEGER);
is_recording INTEGER NOT NULL);
-- Every recording region has a unique vertex and index
CREATE UNIQUE INDEX IF NOT EXISTS regionSanity ON region(
core_id ASC, local_region_index ASC);

CREATE VIEW IF NOT EXISTS region_view AS
SELECT core_id, region_id, x, y, processor, local_region_index
SELECT core_id, region_id, x, y, processor, local_region_index, is_recording
FROM core NATURAL JOIN region;

CREATE TABLE IF NOT EXISTS region_data(
Expand All @@ -79,11 +75,11 @@ CREATE UNIQUE INDEX IF NOT EXISTS region_data_sanity ON region_data(
region_id ASC, extraction_id ASC);

CREATE VIEW IF NOT EXISTS region_data_view AS
SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index,
content, content_len
SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index,
content, content_len, is_recording
FROM region_view NATURAL JOIN region_data;

CREATE VIEW IF NOT EXISTS region_data_plus_view AS
SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index,
content, content_len, run_timestep, run_time_ms, n_run, n_loop, extraction_time
SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index,
content, content_len, is_recording, run_timestep, run_time_ms, n_run, n_loop, extraction_time
FROM region_data_view NATURAL JOIN extraction_view;
Loading

0 comments on commit acf3713

Please sign in to comment.