diff --git a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java index 53373fdf44..430261e1d4 100644 --- a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java +++ b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java @@ -357,6 +357,6 @@ void readMemory(BufferManagerStorage.Region region, var buffer = new byte[region.size]; readMemory(region.core.asChipLocation(), region.startAddress, region.size, new BufferAccumulator(buffer)); - storage.appendRecordingContents(region, buffer); + storage.extractRecordingContents(region, buffer); } } diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java index 88ccd2a993..e5839ec7b0 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java @@ -136,18 +136,21 @@ public RecordingRegion getRecordingRegion(RegionLocation location) { * The X, Y, P and Region * @param data * data to be stored + * @param isRecording + * If this is a recording region * @throws StorageException * If there is a problem storing the data. */ public void storeDataInRegionBuffer(RegionLocation location, - ByteBuffer data) throws StorageException { + ByteBuffer data, boolean isRecording) throws StorageException { if (log.isInfoEnabled()) { log.info("retrieved {} bytes from region {} of {}", data.remaining(), location.region, location.asCoreLocation()); } - storage.appendRecordingContents( - new Region(location, location.region, NULL, 0), data); + storage.extractRecordingContents( + new Region(location, location.region, NULL, 0, isRecording), + data); } /** @@ -158,12 +161,15 @@ public void storeDataInRegionBuffer(RegionLocation location, * The X, Y, P and Region * @param data * data to be stored + * @param isRecording + * if this is a recording region * @throws StorageException * If there is a problem storing the data. */ - public void flushingDataFromRegion(RegionLocation location, ByteBuffer data) + public void flushingDataFromRegion( + RegionLocation location, ByteBuffer data, boolean isRecording) throws StorageException { - storeDataInRegionBuffer(location, data); + storeDataInRegionBuffer(location, data, isRecording); isFlushed.put(location, true); } } diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataGatherer.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataGatherer.java index 0b4ee89ed1..185342e4f7 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataGatherer.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataGatherer.java @@ -259,7 +259,7 @@ private Map> discoverActualWork( for (var p : m.getPlacements()) { var regions = new ArrayList(); for (int id : p.getVertex().getRecordedRegionIds()) { - var r = getRegion(p, id); + var r = getRecordingRegion(p, id); if (r.size > 0) { regions.add(r); count += 1; @@ -269,7 +269,7 @@ private Map> discoverActualWork( } for (var dr : p.getVertex().getDownloadRegions()) { regions.add(new Region(p, dr.getIndex(), - dr.getAddress(), dr.getSize())); + dr.getAddress(), dr.getSize(), false)); count++; } workitems.add(new WorkItems(m, regions)); @@ -486,15 +486,15 @@ private static List describeChunk(Chunk chunk) { */ @UsedInJavadocOnly(BufferManagerStorage.class) @ForOverride - protected abstract Region getRegion(Placement placement, int regionID) + protected abstract Region getRecordingRegion( + Placement placement, int regionID) throws IOException, ProcessException, StorageException, InterruptedException; /** - * Store the data retrieved from a region. Called (at most) once for each - * element in the list returned by {@link #getRegion(Placement,int) - * getRegion(...)}, in order. No guarantee is made about which thread - * will call this method. + * Store the data retrieved from a region + * + * No guarantee is made about which thread will call this method. * * @param r * Where the data came from. diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataReceiver.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataReceiver.java index f7aa246352..8f053009e8 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataReceiver.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DataReceiver.java @@ -146,7 +146,7 @@ public void getDataForPlacements(List placements) var location = new RegionLocation(placement, region.getIndex()); readSomeData(location, region.getAddress(), - region.getSize()); + region.getSize(), false); } } } @@ -172,7 +172,7 @@ private void getDataForPlacement(Placement placement, int recordingRegionId) // Read the data var region = receivedData.getRecordingRegion(location); - readSomeData(location, region.data, region.size); + readSomeData(location, region.data, region.size, true); } private static final long MAX_UINT = 0xFFFFFFFFL; @@ -182,8 +182,8 @@ private static boolean is32bit(long value) { } private void readSomeData(RegionLocation location, MemoryLocation address, - long length) throws IOException, StorageException, ProcessException, - InterruptedException { + long length, boolean isRecording) throws IOException, + StorageException, ProcessException, InterruptedException { if (!is32bit(length)) { throw new IllegalArgumentException("non-32-bit argument"); } @@ -192,7 +192,7 @@ private void readSomeData(RegionLocation location, MemoryLocation address, address); } var data = requestData(location, address, (int) length); - receivedData.flushingDataFromRegion(location, data); + receivedData.flushingDataFromRegion(location, data, isRecording); } /** diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DirectDataGatherer.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DirectDataGatherer.java index 0f9bd1099d..a13f543b5f 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DirectDataGatherer.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/DirectDataGatherer.java @@ -131,14 +131,15 @@ private IntBuffer getCoreRegionTable(CoreLocation core, Vertex vertex) } @Override - protected Region getRegion(Placement placement, int regionID) + protected Region getRecordingRegion(Placement placement, int regionID) throws IOException, ProcessException, InterruptedException { var b = getCoreRegionTable(placement.asCoreLocation(), placement.getVertex()); // TODO This is wrong because of shared regions! int size = b.get(regionID + 1) - b.get(regionID); return new Region( - placement, regionID, new MemoryLocation(b.get(regionID)), size); + placement, regionID, new MemoryLocation(b.get(regionID)), size, + true); } @Override diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java index d278c42c3b..8a49d60275 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java @@ -113,12 +113,14 @@ private List getRegions(Placement placement) } @Override - protected Region getRegion(Placement placement, int index) + protected Region getRecordingRegion( + Placement placement, int index) throws IOException, ProcessException, InterruptedException { var region = getRegions(placement).get(index); log.debug("got region of {} R:{} as {}", placement.asCoreLocation(), index, region); - return new Region(placement, index, region.data, (int) region.size); + return new Region(placement, index, region.data, (int) region.size, + true); } @Override @@ -132,7 +134,7 @@ protected void storeData(Region r, ByteBuffer data) { log.info("storing region data for {} R:{} from {} as {} bytes", r.core, r.regionIndex, r.startAddress, data.remaining()); try { - database.appendRecordingContents(r, data); + database.extractRecordingContents(r, data); numWrites++; } catch (StorageException e) { log.error("failed to write to database", e); diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java index ddc5942063..6f1f794b6d 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java @@ -165,6 +165,14 @@ class Region { @PositiveOrZero public final int finalIgnore; + /** + * States is a recording region + * True if this region is appended after every run/loop. + * False is this region is overwritten aevery run. + */ + @NotNull + public final boolean isRecording; + /** * Create a region descriptor. * @@ -180,9 +188,14 @@ class Region { * @param size * How much data should be downloaded? This is not * necessarily the size of the region. + * @param isRecording + * States is a recording region + * True if this region is appended after every run/loop. + * False is this region is overwritten aevery run. + * */ public Region(HasCoreLocation core, int regionIndex, - MemoryLocation startLocation, int size) { + MemoryLocation startLocation, int size, boolean isRecording) { this.core = core.asCoreLocation(); this.regionIndex = regionIndex; realSize = size; @@ -193,6 +206,7 @@ public Region(HasCoreLocation core, int regionIndex, finalIgnore = (INT_SIZE - (size % INT_SIZE)) % INT_SIZE; size += finalIgnore; this.size = size; + this.isRecording = isRecording; } /** @@ -257,7 +271,7 @@ default int storeRegionContents(Region region, ByteBuffer contents) } /** - * Adds some bytes to the database. The bytes represent part of the contents + * Extract some bytes to the database. The bytes represent the contents * of a recording region of a particular SpiNNaker core. * * @param region @@ -267,26 +281,26 @@ default int storeRegionContents(Region region, ByteBuffer contents) * @throws StorageException * If anything goes wrong. */ - void appendRecordingContents(Region region, byte[] contents) + void extractRecordingContents(Region region, byte[] contents) throws StorageException; void insertMockExtraction() throws StorageException; /** - * Adds some bytes to the database. The bytes represent part of the contents + * Extract some bytes to the database. The bytes represent the contents * of a recording region of a particular SpiNNaker core. * * @param region * The recording region that is being recorded. * @param contents - * The contents to append. + * The contents to extract. * @throws StorageException * If anything goes wrong. */ - default void appendRecordingContents(Region region, ByteBuffer contents) + default void extractRecordingContents(Region region, ByteBuffer contents) throws StorageException { var ary = new byte[contents.remaining()]; contents.slice().get(ary); - appendRecordingContents(region, ary); + extractRecordingContents(region, ary); } } diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java index 5291b0c9e2..710ced064e 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java @@ -52,10 +52,11 @@ private SQL() { /** Create a region record. */ @Parameter("core_id") @Parameter("local_region_index") + @Parameter("is_recording") @ResultColumn("region_id") static final String INSERT_REGION = "INSERT INTO " - + "region(core_id, local_region_index)" - + " VALUES (?, ?) RETURNING region_id"; + + "region(core_id, local_region_index, is_recording)" + + " VALUES (?, ?, ?) RETURNING region_id"; /** For testing create an extraction record. diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java index ce455a2185..6537718bed 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java @@ -117,7 +117,7 @@ private static int getRecordingRegion(Connection conn, int coreID, } try (var s = conn.prepareStatement(INSERT_REGION)) { // core_id, local_region_index, address - setArguments(s, coreID, region.regionIndex); + setArguments(s, coreID, region.regionIndex, region.isRecording); try (var rs = s.executeQuery()) { while (rs.next()) { return rs.getInt("region_id"); @@ -128,6 +128,21 @@ private static int getRecordingRegion(Connection conn, int coreID, "could not make or find recording region record"); } + private static int getExistingRecordingRegion( + Connection conn, int coreID, Region region) throws SQLException { + try (var s = conn.prepareStatement(GET_REGION)) { + // core_id, local_region_index + setArguments(s, coreID, region.regionIndex); + try (var rs = s.executeQuery()) { + while (rs.next()) { + return rs.getInt("region_id"); + } + } + } + throw new IllegalStateException( + "could not find recording region record"); + } + private static int insertMockExtraction(Connection conn) throws SQLException { try (var s = conn.prepareStatement(INSERT_MOCK_EXTRACTION)) { @@ -147,7 +162,7 @@ public void insertMockExtraction() throws StorageException { "Mocking Extraction"); } - private void appendRecordingContents(Connection conn, int regionID, + private void extractRecordingContents(Connection conn, int regionID, int lastExtractionId, byte[] content) throws SQLException { int chunkLen = content.length; var chunk = new ByteArrayInputStream(content); @@ -191,7 +206,7 @@ private int addRegionData(Connection conn, int regionID, int extractionId, } @Override - public void appendRecordingContents(Region region, byte[] contents) + public void extractRecordingContents(Region region, byte[] contents) throws StorageException { // Strip off any prefix and suffix added to make the read aligned byte[] tmp; @@ -201,7 +216,7 @@ public void appendRecordingContents(Region region, byte[] contents) tmp = new byte[region.realSize]; arraycopy(contents, region.initialIgnore, tmp, 0, region.realSize); } - callV(conn -> appendRecordContents(conn, region, tmp), + callV(conn -> extractRecordContents(conn, region, tmp), "creating or adding to a recorded region"); } @@ -216,14 +231,14 @@ public void appendRecordingContents(Region region, byte[] contents) * The bytes to append. * @throws SQLException * If anything goes wrong. - * @see #appendRecordingContents(Region,int,byte[]) + * @see #extractRecordingContents(Region,int,byte[]) */ - private void appendRecordContents(Connection conn, Region region, + private void extractRecordContents(Connection conn, Region region, byte[] contents) throws SQLException { int coreID = getRecordingCore(conn, region.core); int regionID = getRecordingRegion(conn, coreID, region); int lastExtractionId = getLastExtractionId(conn); - appendRecordingContents(conn, regionID, lastExtractionId, contents); + extractRecordingContents(conn, regionID, lastExtractionId, contents); } @Override @@ -238,7 +253,7 @@ private static byte[] getRecordingRegionContents(Connection conn, var accum = new ByteArrayOutputStream(); try { int coreID = getRecordingCore(conn, region.core); - int regionID = getRecordingRegion(conn, coreID, region); + int regionID = getExistingRecordingRegion(conn, coreID, region); try (var s = conn.prepareStatement(FETCH_RECORDING)) { // region_id setArguments(s, regionID); diff --git a/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql b/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql index 9297b813ad..f420523e61 100644 --- a/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql +++ b/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql @@ -52,17 +52,13 @@ CREATE TABLE IF NOT EXISTS region( core_id INTEGER NOT NULL REFERENCES core(core_id) ON DELETE RESTRICT, local_region_index INTEGER NOT NULL, - address INTEGER, - content BLOB NOT NULL DEFAULT '', - content_len INTEGER DEFAULT 0, - fetches INTEGER NOT NULL DEFAULT 0, - append_time INTEGER); + is_recording INTEGER NOT NULL); -- Every recording region has a unique vertex and index CREATE UNIQUE INDEX IF NOT EXISTS regionSanity ON region( core_id ASC, local_region_index ASC); CREATE VIEW IF NOT EXISTS region_view AS - SELECT core_id, region_id, x, y, processor, local_region_index +SELECT core_id, region_id, x, y, processor, local_region_index, is_recording FROM core NATURAL JOIN region; CREATE TABLE IF NOT EXISTS region_data( @@ -79,11 +75,11 @@ CREATE UNIQUE INDEX IF NOT EXISTS region_data_sanity ON region_data( region_id ASC, extraction_id ASC); CREATE VIEW IF NOT EXISTS region_data_view AS - SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index, - content, content_len +SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index, + content, content_len, is_recording FROM region_view NATURAL JOIN region_data; CREATE VIEW IF NOT EXISTS region_data_plus_view AS - SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index, - content, content_len, run_timestep, run_time_ms, n_run, n_loop, extraction_time +SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index, + content, content_len, is_recording, run_timestep, run_time_ms, n_run, n_loop, extraction_time FROM region_data_view NATURAL JOIN extraction_view; diff --git a/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java b/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java index be30c36ae9..e7aa167df0 100644 --- a/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java +++ b/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java @@ -59,9 +59,9 @@ void testBasicOps() throws StorageException { assertEquals(List.of(), storage.getCoresWithStorage()); - var rr = new BufferManagerStorage.Region(core, 0, NULL, 100); + var rr = new BufferManagerStorage.Region(core, 0, NULL, 100, true); storage.insertMockExtraction(); - storage.appendRecordingContents(rr, bytes("def")); + storage.extractRecordingContents(rr, bytes("def")); assertArrayEquals("def".getBytes(UTF_8), storage.getRecordingRegionContents(rr)); @@ -75,13 +75,13 @@ void testWithExisting() throws StorageException { var core = new CoreLocation(0, 0, 0); // append creates - var rr = new BufferManagerStorage.Region(core, 1, NULL, 100); + var rr = new BufferManagerStorage.Region(core, 1, NULL, 100, true); storage.insertMockExtraction(); - storage.appendRecordingContents(rr, bytes("ab")); + storage.extractRecordingContents(rr, bytes("ab")); storage.insertMockExtraction(); - storage.appendRecordingContents(rr, bytes("cd")); + storage.extractRecordingContents(rr, bytes("cd")); storage.insertMockExtraction(); - storage.appendRecordingContents(rr, bytes("ef")); + storage.extractRecordingContents(rr, bytes("ef")); assertEquals("abcdef", str(storage.getRecordingRegionContents(rr))); }