From 1b75616c0d26a9d3e3aefb1e9d2dd272b6dc7c8b Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Fri, 13 Sep 2024 14:49:27 +0100 Subject: [PATCH 1/3] Seperate recording and download data --- .../transceiver/ReadMemoryProcess.java | 2 +- .../download/BufferedReceivingData.java | 2 +- .../download/RecordingRegionDataGatherer.java | 2 +- .../storage/BufferManagerStorage.java | 27 +-- .../spinnaker/storage/sqlite/SQL.java | 99 ++++++++--- .../storage/sqlite/SQLiteBufferStorage.java | 159 +++++++++++++----- .../storage/sqlite/buffer_manager.sql | 89 +++++++--- .../spinnaker/storage/TestSQLiteStorage.java | 36 +++- 8 files changed, 291 insertions(+), 125 deletions(-) diff --git a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java index 430261e1d4..a4e8f4107c 100644 --- a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java +++ b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/ReadMemoryProcess.java @@ -357,6 +357,6 @@ void readMemory(BufferManagerStorage.Region region, var buffer = new byte[region.size]; readMemory(region.core.asChipLocation(), region.startAddress, region.size, new BufferAccumulator(buffer)); - storage.extractRecordingContents(region, buffer); + storage.addRecordingContents(region, buffer); } } diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java index e5839ec7b0..7c25c0945f 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/BufferedReceivingData.java @@ -148,7 +148,7 @@ public void storeDataInRegionBuffer(RegionLocation location, data.remaining(), location.region, location.asCoreLocation()); } - storage.extractRecordingContents( + storage.addRecordingContents( new Region(location, location.region, NULL, 0, isRecording), data); } diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java index 8a49d60275..8ea2a88059 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/download/RecordingRegionDataGatherer.java @@ -134,7 +134,7 @@ protected void storeData(Region r, ByteBuffer data) { log.info("storing region data for {} R:{} from {} as {} bytes", r.core, r.regionIndex, r.startAddress, data.remaining()); try { - database.extractRecordingContents(r, data); + database.addRecordingContents(r, data); numWrites++; } catch (StorageException e) { log.error("failed to write to database", e); diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java index 6f1f794b6d..ae1fc33413 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/BufferManagerStorage.java @@ -33,25 +33,6 @@ * @author Donal Fellows */ public interface BufferManagerStorage extends ProxyAwareStorage { - /** - * Retrieves some bytes from the database. The bytes represent the contents - * of a DSE region of a particular SpiNNaker core. - * - * @param region - * The region descriptor. - * @return The region contents. - * @throws IllegalArgumentException - * If there's no such saved region. - * @throws StorageException - * If anything goes wrong. - * @throws UnsupportedOperationException - * This method is unsupported. - * @deprecated Currently unsupported; underlying database structure absent - */ - @Deprecated - default byte[] getRegionContents(Region region) throws StorageException { - throw new UnsupportedOperationException(); - } /** * Retrieves some bytes from the database. The bytes represent the contents @@ -65,7 +46,7 @@ default byte[] getRegionContents(Region region) throws StorageException { * @throws StorageException * If anything goes wrong. */ - byte[] getRecordingRegionContents(Region region) throws StorageException; + byte[] getContents(Region region) throws StorageException; /** * Removes some bytes from the database. The bytes represent the contents of @@ -281,7 +262,7 @@ default int storeRegionContents(Region region, ByteBuffer contents) * @throws StorageException * If anything goes wrong. */ - void extractRecordingContents(Region region, byte[] contents) + void addRecordingContents(Region region, byte[] contents) throws StorageException; void insertMockExtraction() throws StorageException; @@ -297,10 +278,10 @@ void extractRecordingContents(Region region, byte[] contents) * @throws StorageException * If anything goes wrong. */ - default void extractRecordingContents(Region region, ByteBuffer contents) + default void addRecordingContents(Region region, ByteBuffer contents) throws StorageException { var ary = new byte[contents.remaining()]; contents.slice().get(ary); - extractRecordingContents(region, ary); + addRecordingContents(region, ary); } } diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java index 710ced064e..3cfd6243c1 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQL.java @@ -49,15 +49,21 @@ private SQL() { static final String GET_LOCATION = "SELECT core_id FROM core" + " WHERE x = ? AND y = ? AND processor = ? LIMIT 1"; - /** Create a region record. */ + /** Create a recording region record. */ @Parameter("core_id") @Parameter("local_region_index") - @Parameter("is_recording") - @ResultColumn("region_id") - static final String INSERT_REGION = "INSERT INTO " - + "region(core_id, local_region_index, is_recording)" - + " VALUES (?, ?, ?) RETURNING region_id"; + @ResultColumn("recording_region_id") + static final String INSERT_RECORDING_REGION = "INSERT INTO" + + " recording_region(core_id, local_region_index)" + + " VALUES (?, ?) RETURNING recording_region_id"; + /** Create a download region record. */ + @Parameter("core_id") + @Parameter("local_region_index") + @ResultColumn("download_region_id") + static final String INSERT_DOWNLOAD_REGION = "INSERT INTO" + + " download_region(core_id, local_region_index)" + + " VALUES (?, ?) RETURNING download_region_id"; /** For testing create an extraction record. @@ -68,12 +74,21 @@ private SQL() { + "extraction(run_timestep, n_run, n_loop, extract_time) " + "VALUES(12345, 1, NULL, 987654) RETURNING extraction_id "; - /** Find an existing region record. */ + /** Find an existing recording region record. */ @Parameter("core_id") @Parameter("local_region_index") - @ResultColumn("region_id") - static final String GET_REGION = "SELECT region_id FROM region WHERE " - + "core_id = ? AND local_region_index = ? LIMIT 1"; + @ResultColumn("recoding_region_id") + static final String GET_RECORDING_REGION = "SELECT recording_region_id" + + " FROM recording_region " + + " WHERE core_id = ? AND local_region_index = ? LIMIT 1"; + + /** Find an existing download region record. */ + @Parameter("core_id") + @Parameter("local_region_index") + @ResultColumn("download_region_id") + static final String GET_DOWNLOAD_REGION = "SELECT download_region_id " + + " FROM download_region" + + " WHERE core_id = ? AND local_region_index = ? LIMIT 1"; /** Find the current extraction_id. */ @ResultColumn("max_id") @@ -81,25 +96,46 @@ private SQL() { "SELECT max(extraction_id) as max_id " + "FROM extraction LIMIT 1"; - /** Create a region record. */ - @Parameter("region_id") + /** Create a recoding data record. */ + @Parameter("recording_region_id") @Parameter("extraction_id") @Parameter("content_to_add") @Parameter("content_len") - @ResultColumn("region_data_id") - static final String ADD_REGION_DATA = - "INSERT INTO region_data(region_id, extraction_id, content, " - + "content_len, missing_data) " - + "VALUES (?, ?, CAST(? AS BLOB), ?, 0) " - + "RETURNING region_data_id"; + @ResultColumn("recording_data_id") + static final String ADD_RECORDING_DATA = + "INSERT INTO recording_data(recording_region_id, extraction_id, " + + " content, content_len, missing_data) " + + "VALUES (?, ?, CAST(? AS BLOB), ?, 0) " + + "RETURNING recording_data_id"; + + /** Create a recoding data record. */ + @Parameter("download_region_id") + @Parameter("extraction_id") + @Parameter("content_to_add") + @Parameter("content_len") + @ResultColumn("download_data_id") + static final String ADD_DOWNLOAD_DATA = + "INSERT INTO download_data(download_region_id, extraction_id, " + + " content, content_len, missing_data) " + + "VALUES (?, ?, CAST(? AS BLOB), ?, 0) " + + "RETURNING download_data_id"; + + /** Fetch the current variable state of a region record. */ + @Parameter("recording_region_id") + @ResultColumn("content") + @ResultColumn("missing_data") + static final String GET_RECORDING = + "SELECT content, missing_data FROM recording_data " + + "WHERE recording_region_id = ? ORDER BY extraction_id ASC"; /** Fetch the current variable state of a region record. */ - @Parameter("region_id") + @Parameter("download_region_id") @ResultColumn("content") @ResultColumn("missing_data") - static final String FETCH_RECORDING = - "SELECT content, missing_data FROM region_data " - + "WHERE region_id = ? ORDER BY extraction_id ASC"; + static final String GET_DOWNLOAD = + "SELECT content, missing_data FROM download_data " + + "WHERE download_region_id = ? ORDER BY extraction_id DESC " + + "LIMIT 1"; /** List the cores with storage. */ @Parameters({}) @@ -107,8 +143,10 @@ private SQL() { @ResultColumn("y") @ResultColumn("processor") static final String GET_CORES_WITH_STORAGE = - "SELECT DISTINCT x, y, processor FROM region_view" - + " ORDER BY x, y, processor"; + "SELECT DISTINCT x, y, processor FROM recording_data_view " + + "UNION " + + "SELECT DISTINCT x, y, processor FROM download_data_view " + + "ORDER BY x, y, processor;"; /** List the regions of a core with storage. */ @Parameter("x") @@ -116,9 +154,16 @@ private SQL() { @Parameter("processor") @ResultColumn("local_region_index") static final String GET_REGIONS_WITH_STORAGE = - "SELECT DISTINCT local_region_index FROM region_view" - + " WHERE x = ? AND y = ? AND processor = ?" - + " ORDER BY local_region_index"; + "SELECT DISTINCT local_region_index FROM " + + "(" + + "SELECT local_region_index, x, y, processor " + + "FROM recording_region_view " + + "UNION " + + "SELECT local_region_index, x, y, processor " + + "FROM download_region_view " + + ") " + + "WHERE x = ? AND y = ? AND processor = ? " + + "ORDER BY local_region_index"; // ----------------------------------------------------------------- // Data loading ---------------------------------------------------- diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java index 6537718bed..f04ed20437 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java @@ -17,16 +17,7 @@ import static java.lang.System.arraycopy; import static org.slf4j.LoggerFactory.getLogger; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.ADD_REGION_DATA; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.FETCH_RECORDING; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_CORES_WITH_STORAGE; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_LAST_EXTRACTION_ID; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_LOCATION; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_REGION; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_REGIONS_WITH_STORAGE; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.INSERT_LOCATION; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.INSERT_MOCK_EXTRACTION; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.INSERT_REGION; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.*; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -106,21 +97,21 @@ private static int getRecordingCore(Connection conn, CoreLocation core) private static int getRecordingRegion(Connection conn, int coreID, Region region) throws SQLException { - try (var s = conn.prepareStatement(GET_REGION)) { + try (var s = conn.prepareStatement(GET_RECORDING_REGION)) { // core_id, local_region_index setArguments(s, coreID, region.regionIndex); try (var rs = s.executeQuery()) { while (rs.next()) { - return rs.getInt("region_id"); + return rs.getInt("recording_region_id"); } } } - try (var s = conn.prepareStatement(INSERT_REGION)) { + try (var s = conn.prepareStatement(INSERT_RECORDING_REGION)) { // core_id, local_region_index, address - setArguments(s, coreID, region.regionIndex, region.isRecording); + setArguments(s, coreID, region.regionIndex); try (var rs = s.executeQuery()) { while (rs.next()) { - return rs.getInt("region_id"); + return rs.getInt("recording_region_id"); } } } @@ -128,14 +119,53 @@ private static int getRecordingRegion(Connection conn, int coreID, "could not make or find recording region record"); } + private static int getDownloadRegion(Connection conn, int coreID, + Region region) throws SQLException { + try (var s = conn.prepareStatement(GET_DOWNLOAD_REGION)) { + // core_id, local_region_index + setArguments(s, coreID, region.regionIndex); + try (var rs = s.executeQuery()) { + while (rs.next()) { + return rs.getInt("download_region_id"); + } + } + } + try (var s = conn.prepareStatement(INSERT_DOWNLOAD_REGION)) { + // core_id, local_region_index, address + setArguments(s, coreID, region.regionIndex); + try (var rs = s.executeQuery()) { + while (rs.next()) { + return rs.getInt("download_region_id"); + } + } + } + throw new IllegalStateException( + "could not make or find recording region record"); + } + private static int getExistingRecordingRegion( Connection conn, int coreID, Region region) throws SQLException { - try (var s = conn.prepareStatement(GET_REGION)) { + try (var s = conn.prepareStatement(GET_RECORDING_REGION)) { // core_id, local_region_index setArguments(s, coreID, region.regionIndex); try (var rs = s.executeQuery()) { while (rs.next()) { - return rs.getInt("region_id"); + return rs.getInt("recording_region_id"); + } + } + } + throw new IllegalStateException( + "could not find recording region record"); + } + + private static int getExistingDownloadRegion( + Connection conn, int coreID, Region region) throws SQLException { + try (var s = conn.prepareStatement(GET_DOWNLOAD_REGION)) { + // core_id, local_region_index + setArguments(s, coreID, region.regionIndex); + try (var rs = s.executeQuery()) { + while (rs.next()) { + return rs.getInt("download_region_id"); } } } @@ -162,15 +192,6 @@ public void insertMockExtraction() throws StorageException { "Mocking Extraction"); } - private void extractRecordingContents(Connection conn, int regionID, - int lastExtractionId, byte[] content) throws SQLException { - int chunkLen = content.length; - var chunk = new ByteArrayInputStream(content); - log.debug("adding chunk of {} bytes to region data table for region {}", - chunkLen, regionID); - addRegionData(conn, regionID, lastExtractionId, chunkLen, chunk); - } - private static byte[] read(ByteArrayInputStream chunk, int chunkLen) throws SQLException { var buffer = new byte[chunkLen]; @@ -190,15 +211,30 @@ private static byte[] read(ByteArrayInputStream chunk, int chunkLen) return nb; } - private int addRegionData(Connection conn, int regionID, int extractionId, + private int addRecordingData(Connection conn, int regionID, int extractionId, int chunkLen, ByteArrayInputStream chunk) throws SQLException { - try (var s = conn.prepareStatement(ADD_REGION_DATA)) { + try (var s = conn.prepareStatement(ADD_RECORDING_DATA)) { + // region_id, extraction_id, content, content_len, + setArguments( + s, regionID, extractionId, read(chunk, chunkLen), chunkLen); + try (var rs = s.executeQuery()) { + while (rs.next()) { + return rs.getInt("recording_data_id"); + } + } + } + throw new IllegalStateException("no row inserted"); + } + + private int addDownloadData(Connection conn, int regionID, int extractionId, + int chunkLen, ByteArrayInputStream chunk) throws SQLException { + try (var s = conn.prepareStatement(ADD_DOWNLOAD_DATA)) { // region_id, extraction_id, content, content_len, setArguments( s, regionID, extractionId, read(chunk, chunkLen), chunkLen); try (var rs = s.executeQuery()) { while (rs.next()) { - return rs.getInt("region_data_id"); + return rs.getInt("download_data_id"); } } } @@ -206,7 +242,7 @@ private int addRegionData(Connection conn, int regionID, int extractionId, } @Override - public void extractRecordingContents(Region region, byte[] contents) + public void addRecordingContents(Region region, byte[] contents) throws StorageException { // Strip off any prefix and suffix added to make the read aligned byte[] tmp; @@ -216,8 +252,13 @@ public void extractRecordingContents(Region region, byte[] contents) tmp = new byte[region.realSize]; arraycopy(contents, region.initialIgnore, tmp, 0, region.realSize); } - callV(conn -> extractRecordContents(conn, region, tmp), + if (region.isRecording) { + callV(conn -> addRecordingContents(conn, region, tmp), "creating or adding to a recorded region"); + } else { + callV(conn -> addDownloadContents(conn, region, tmp), + "creating or adding to a recorded region"); + } } /** @@ -231,30 +272,70 @@ public void extractRecordingContents(Region region, byte[] contents) * The bytes to append. * @throws SQLException * If anything goes wrong. - * @see #extractRecordingContents(Region,int,byte[]) */ - private void extractRecordContents(Connection conn, Region region, + private void addRecordingContents(Connection conn, Region region, byte[] contents) throws SQLException { int coreID = getRecordingCore(conn, region.core); int regionID = getRecordingRegion(conn, coreID, region); int lastExtractionId = getLastExtractionId(conn); - extractRecordingContents(conn, regionID, lastExtractionId, contents); + int chunkLen = contents.length; + var chunk = new ByteArrayInputStream(contents); + log.debug("adding chunk of {} bytes to region data table for region {}", + chunkLen, regionID); + addRecordingData(conn, regionID, lastExtractionId, chunkLen, chunk); + } + + private void addDownloadContents(Connection conn, Region region, + byte[] contents) throws SQLException { + int coreID = getRecordingCore(conn, region.core); + int regionID = getDownloadRegion(conn, coreID, region); + int lastExtractionId = getLastExtractionId(conn); + int chunkLen = contents.length; + var chunk = new ByteArrayInputStream(contents); + log.debug("adding chunk of {} bytes to region data table for region {}", + chunkLen, regionID); + addDownloadData(conn, regionID, lastExtractionId, chunkLen, chunk); } @Override - public byte[] getRecordingRegionContents(Region region) - throws StorageException { - return callR(conn -> getRecordingRegionContents(conn, region), + public byte[] getContents(Region region) throws StorageException { + if (region.isRecording){ + return callR(conn -> getRecordingContents(conn, region), + "retrieving a recording region"); + } else { + return callR(conn -> getDownloadContents(conn, region), "retrieving a recording region"); + } } - private static byte[] getRecordingRegionContents(Connection conn, + private static byte[] getRecordingContents(Connection conn, Region region) throws SQLException { var accum = new ByteArrayOutputStream(); try { int coreID = getRecordingCore(conn, region.core); int regionID = getExistingRecordingRegion(conn, coreID, region); - try (var s = conn.prepareStatement(FETCH_RECORDING)) { + try (var s = conn.prepareStatement(GET_RECORDING)) { + // region_id + setArguments(s, regionID); + try (var rs = s.executeQuery()) { + while (rs.next()) { + accum.write(rs.getBytes("content")); + } + } + } + } catch (IOException | OutOfMemoryError e) { + throw new RuntimeException("BLOB sequence too large for Java", e); + } + return accum.toByteArray(); + } + + private static byte[] getDownloadContents(Connection conn, + Region region) throws SQLException { + var accum = new ByteArrayOutputStream(); + try { + int coreID = getRecordingCore(conn, region.core); + int regionID = getExistingDownloadRegion(conn, coreID, region); + try (var s = conn.prepareStatement(GET_DOWNLOAD)) { // region_id setArguments(s, regionID); try (var rs = s.executeQuery()) { diff --git a/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql b/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql index f420523e61..3505cea036 100644 --- a/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql +++ b/SpiNNaker-storage/src/main/resources/uk/ac/manchester/spinnaker/storage/sqlite/buffer_manager.sql @@ -47,39 +47,76 @@ CREATE VIEW IF NOT EXISTS extraction_view AS -- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -- A table describing recording regions. -CREATE TABLE IF NOT EXISTS region( - region_id INTEGER PRIMARY KEY AUTOINCREMENT, - core_id INTEGER NOT NULL - REFERENCES core(core_id) ON DELETE RESTRICT, - local_region_index INTEGER NOT NULL, - is_recording INTEGER NOT NULL); +CREATE TABLE IF NOT EXISTS recording_region( + recording_region_id INTEGER PRIMARY KEY AUTOINCREMENT, + core_id INTEGER NOT NULL + REFERENCES core(core_id) ON DELETE RESTRICT, + local_region_index INTEGER NOT NULL); -- Every recording region has a unique vertex and index -CREATE UNIQUE INDEX IF NOT EXISTS regionSanity ON region( +CREATE UNIQUE INDEX IF NOT EXISTS recording_region_sanity ON recording_region( core_id ASC, local_region_index ASC); -CREATE VIEW IF NOT EXISTS region_view AS -SELECT core_id, region_id, x, y, processor, local_region_index, is_recording -FROM core NATURAL JOIN region; +CREATE VIEW IF NOT EXISTS recording_region_view AS +SELECT core_id, recording_region_id, x, y, processor, local_region_index +FROM core NATURAL JOIN recording_region; -CREATE TABLE IF NOT EXISTS region_data( - region_data_id INTEGER PRIMARY KEY AUTOINCREMENT, - region_id INTEGER NOT NULL - REFERENCES region(region_id) ON DELETE RESTRICT, - extraction_id INTEGER NOT NULL - REFERENCES extraction(extraction_id) ON DELETE RESTRICT, +CREATE TABLE IF NOT EXISTS recording_data( + recording_data_id INTEGER PRIMARY KEY AUTOINCREMENT, + recording_region_id INTEGER NOT NULL + REFERENCES recording_region(recording_region_id) ON DELETE RESTRICT, + extraction_id INTEGER NOT NULL + REFERENCES extraction(extraction_id) ON DELETE RESTRICT, content BLOB NOT NULL, content_len INTEGER NOT NULL, - missing_data INTEGER NOT NULL); + missing_data INTEGER NOT NULL); -- Every recording region is extracted once per BefferExtractor run -CREATE UNIQUE INDEX IF NOT EXISTS region_data_sanity ON region_data( - region_id ASC, extraction_id ASC); +CREATE UNIQUE INDEX IF NOT EXISTS recording_data_sanity ON recording_data( + recording_region_id ASC, extraction_id ASC); -CREATE VIEW IF NOT EXISTS region_data_view AS -SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index, - content, content_len, is_recording -FROM region_view NATURAL JOIN region_data; +CREATE VIEW IF NOT EXISTS recording_data_view AS +SELECT core_id, recording_region_id, extraction_id, x, y, processor, local_region_index, + content, content_len +FROM recording_region_view NATURAL JOIN recording_data; -CREATE VIEW IF NOT EXISTS region_data_plus_view AS +CREATE VIEW IF NOT EXISTS recording_data_plus_view AS SELECT core_id, region_id, extraction_id, x, y, processor, local_region_index, - content, content_len, is_recording, run_timestep, run_time_ms, n_run, n_loop, extraction_time -FROM region_data_view NATURAL JOIN extraction_view; + content, content_len, run_timestep, run_time_ms, n_run, n_loop, extraction_time +FROM recording_data_view NATURAL JOIN extraction_view; + +-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-- A table describing download regions. +CREATE TABLE IF NOT EXISTS download_region( + download_region_id INTEGER PRIMARY KEY AUTOINCREMENT, + core_id INTEGER NOT NULL + REFERENCES core(core_id) ON DELETE RESTRICT, + local_region_index INTEGER NOT NULL); +-- Every recording region has a unique vertex and index +CREATE UNIQUE INDEX IF NOT EXISTS download_region_sanity ON download_region( + core_id ASC, local_region_index ASC); + +CREATE VIEW IF NOT EXISTS download_region_view AS +SELECT core_id, download_region_id, x, y, processor, local_region_index +FROM core NATURAL JOIN download_region; + +CREATE TABLE IF NOT EXISTS download_data( + download_data_id INTEGER PRIMARY KEY AUTOINCREMENT, + download_region_id INTEGER NOT NULL + REFERENCES download_region(download_region_id) ON DELETE RESTRICT, + extraction_id INTEGER NOT NULL + REFERENCES extraction(extraction_id) ON DELETE RESTRICT, + content BLOB NOT NULL, + content_len INTEGER NOT NULL, + missing_data INTEGER NOT NULL); +-- Every recording region is extracted once per BefferExtractor run +CREATE UNIQUE INDEX IF NOT EXISTS download_data_sanity ON download_data( + download_region_id ASC, extraction_id ASC); + +CREATE VIEW IF NOT EXISTS download_data_view AS +SELECT core_id, download_region_id, extraction_id, x, y, processor, local_region_index, + content, content_len +FROM download_region_view NATURAL JOIN download_data; + +CREATE VIEW IF NOT EXISTS download_data_plus_view AS +SELECT core_id, download_region_id, extraction_id, x, y, processor, local_region_index, + content, content_len, run_timestep, run_time_ms, n_run, n_loop, extraction_time +FROM download_data_view NATURAL JOIN extraction_view; diff --git a/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java b/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java index e7aa167df0..b03211324d 100644 --- a/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java +++ b/SpiNNaker-storage/src/test/java/uk/ac/manchester/spinnaker/storage/TestSQLiteStorage.java @@ -53,7 +53,7 @@ private static String str(byte[] bytes) { } @Test - void testBasicOps() throws StorageException { + void testRecording() throws StorageException { var storage = new BufferManagerDatabaseEngine(db).getStorageInterface(); var core = new CoreLocation(0, 0, 0); @@ -61,9 +61,26 @@ void testBasicOps() throws StorageException { var rr = new BufferManagerStorage.Region(core, 0, NULL, 100, true); storage.insertMockExtraction(); - storage.extractRecordingContents(rr, bytes("def")); + storage.addRecordingContents(rr, bytes("def")); assertArrayEquals("def".getBytes(UTF_8), - storage.getRecordingRegionContents(rr)); + storage.getContents(rr)); + + assertEquals(List.of(core), storage.getCoresWithStorage()); + assertEquals(List.of(0), storage.getRegionsWithStorage(core)); + } + + @Test + void testDownload() throws StorageException { + var storage = new BufferManagerDatabaseEngine(db).getStorageInterface(); + var core = new CoreLocation(0, 0, 0); + + assertEquals(List.of(), storage.getCoresWithStorage()); + + var rr = new BufferManagerStorage.Region(core, 0, NULL, 100, false); + storage.insertMockExtraction(); + storage.addRecordingContents(rr, bytes("def")); + assertArrayEquals("def".getBytes(UTF_8), + storage.getContents(rr)); assertEquals(List.of(core), storage.getCoresWithStorage()); assertEquals(List.of(0), storage.getRegionsWithStorage(core)); @@ -76,13 +93,18 @@ void testWithExisting() throws StorageException { // append creates var rr = new BufferManagerStorage.Region(core, 1, NULL, 100, true); + var rd = new BufferManagerStorage.Region(core, 1, NULL, 100, false); storage.insertMockExtraction(); - storage.extractRecordingContents(rr, bytes("ab")); + storage.addRecordingContents(rr, bytes("ab")); + storage.addRecordingContents(rd, bytes("AB")); storage.insertMockExtraction(); - storage.extractRecordingContents(rr, bytes("cd")); + storage.addRecordingContents(rr, bytes("cd")); + storage.addRecordingContents(rd, bytes("CD")); storage.insertMockExtraction(); - storage.extractRecordingContents(rr, bytes("ef")); - assertEquals("abcdef", str(storage.getRecordingRegionContents(rr))); + storage.addRecordingContents(rr, bytes("ef")); + storage.addRecordingContents(rd, bytes("EF")); + assertEquals("abcdef", str(storage.getContents(rr))); + assertEquals("EF", str(storage.getContents(rd))); } } From 6a5fa02ed489a94d2d4523bb7ed3f6e964e4f389 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Fri, 13 Sep 2024 15:27:07 +0100 Subject: [PATCH 2/3] style --- .../storage/sqlite/SQLiteBufferStorage.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java index f04ed20437..e449f73d8d 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java @@ -120,7 +120,7 @@ private static int getRecordingRegion(Connection conn, int coreID, } private static int getDownloadRegion(Connection conn, int coreID, - Region region) throws SQLException { + Region region) throws SQLException { try (var s = conn.prepareStatement(GET_DOWNLOAD_REGION)) { // core_id, local_region_index setArguments(s, coreID, region.regionIndex); @@ -211,8 +211,9 @@ private static byte[] read(ByteArrayInputStream chunk, int chunkLen) return nb; } - private int addRecordingData(Connection conn, int regionID, int extractionId, - int chunkLen, ByteArrayInputStream chunk) throws SQLException { + private int addRecordingData(Connection conn, int regionID, + int extractionId, int chunkLen, + ByteArrayInputStream chunk) throws SQLException { try (var s = conn.prepareStatement(ADD_RECORDING_DATA)) { // region_id, extraction_id, content, content_len, setArguments( @@ -227,7 +228,7 @@ private int addRecordingData(Connection conn, int regionID, int extractionId, } private int addDownloadData(Connection conn, int regionID, int extractionId, - int chunkLen, ByteArrayInputStream chunk) throws SQLException { + int chunkLen, ByteArrayInputStream chunk) throws SQLException { try (var s = conn.prepareStatement(ADD_DOWNLOAD_DATA)) { // region_id, extraction_id, content, content_len, setArguments( @@ -286,7 +287,7 @@ private void addRecordingContents(Connection conn, Region region, } private void addDownloadContents(Connection conn, Region region, - byte[] contents) throws SQLException { + byte[] contents) throws SQLException { int coreID = getRecordingCore(conn, region.core); int regionID = getDownloadRegion(conn, coreID, region); int lastExtractionId = getLastExtractionId(conn); @@ -299,7 +300,7 @@ private void addDownloadContents(Connection conn, Region region, @Override public byte[] getContents(Region region) throws StorageException { - if (region.isRecording){ + if (region.isRecording) { return callR(conn -> getRecordingContents(conn, region), "retrieving a recording region"); } else { @@ -329,8 +330,8 @@ private static byte[] getRecordingContents(Connection conn, return accum.toByteArray(); } - private static byte[] getDownloadContents(Connection conn, - Region region) throws SQLException { + private static byte[] getDownloadContents( + Connection conn, Region region) throws SQLException { var accum = new ByteArrayOutputStream(); try { int coreID = getRecordingCore(conn, region.core); From b2eddac65e5aae97c2aea5ff4d86159921bdf88d Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Fri, 13 Sep 2024 15:51:54 +0100 Subject: [PATCH 3/3] list references --- .../storage/sqlite/SQLiteBufferStorage.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java index e449f73d8d..7d11419072 100644 --- a/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java +++ b/SpiNNaker-storage/src/main/java/uk/ac/manchester/spinnaker/storage/sqlite/SQLiteBufferStorage.java @@ -17,7 +17,20 @@ import static java.lang.System.arraycopy; import static org.slf4j.LoggerFactory.getLogger; -import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.*; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.ADD_DOWNLOAD_DATA; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.ADD_RECORDING_DATA; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_CORES_WITH_STORAGE; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_DOWNLOAD; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_DOWNLOAD_REGION; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_LAST_EXTRACTION_ID; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_LOCATION; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_RECORDING_REGION; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_RECORDING; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.GET_REGIONS_WITH_STORAGE; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.INSERT_LOCATION; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.INSERT_DOWNLOAD_REGION; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.INSERT_RECORDING_REGION; +import static uk.ac.manchester.spinnaker.storage.sqlite.SQL.INSERT_MOCK_EXTRACTION; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream;