Skip to content

Commit

Permalink
merged in remote/region_downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian-B committed Sep 9, 2024
2 parents 2e6c961 + 6d517f5 commit ec1d680
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ private static final class WorkItems {
* theoretically be done in any order... but needs to be processed
* single-threaded anyway.
*/
private final List<List<Region>> regions;
private final List<Region> regions;

WorkItems(Monitor m, List<List<Region>> region) {
WorkItems(Monitor m, List<Region> region) {
this.monitor = m;
this.regions = region;
}
Expand Down Expand Up @@ -257,30 +257,27 @@ private Map<ChipLocation, List<WorkItems>> discoverActualWork(
m.updateTransactionIdFromMachine(txrx);

for (var p : m.getPlacements()) {
var regions = new ArrayList<List<Region>>();
var regions = new ArrayList<Region>();
for (int id : p.getVertex().getRecordedRegionIds()) {
var r = getRegion(p, id);
if (!r.isEmpty()) {
if (r.size > 0) {
regions.add(r);
count += 1;
} else {
storeData(r, allocate(0));
}
count += r.size();
}
for (var dr : p.getVertex().getDownloadRegions()) {
regions.add(List.of(new Region(p, dr.getIndex(),
dr.getAddress(), dr.getSize())));
regions.add(new Region(p, dr.getIndex(),
dr.getAddress(), dr.getSize()));
count++;
}

if (!regions.isEmpty()) {
workitems.add(new WorkItems(m, regions));
}
workitems.add(new WorkItems(m, regions));
}

}
// Totally empty boards can be ignored
if (!workitems.isEmpty()) {
work.put(g.asChipLocation(), workitems);
}
work.put(g.asChipLocation(), workitems);
}
log.info("found {} regions to download", count);
return work;
Expand Down Expand Up @@ -375,19 +372,17 @@ private void fastDownload(List<WorkItems> work,
log.info("processing fast downloads for {}", conn.getChip());
var dl = new Downloader(conn);
for (var item : work) {
for (var regionsOnCore : item.regions) {
for (var region : item.regions) {
/*
* Once there's something too small, all subsequent
* retrieves for that recording region have to be done the
* same way to get the data in the DB in the right order.
*/
for (var region : regionsOnCore) {
var data = dl.doDownload(item.monitor, region);
if (SPINNAKER_COMPARE_DOWNLOAD != null) {
compareDownloadWithSCP(region, data);
}
storeData(region, data);
var data = dl.doDownload(item.monitor, region);
if (SPINNAKER_COMPARE_DOWNLOAD != null) {
compareDownloadWithSCP(region, data);
}
storeData(region, data);
}
}
}
Expand Down Expand Up @@ -491,7 +486,7 @@ private static List<String> describeChunk(Chunk<Byte> chunk) {
*/
@UsedInJavadocOnly(BufferManagerStorage.class)
@ForOverride
protected abstract List<Region> getRegion(Placement placement, int regionID)
protected abstract Region getRegion(Placement placement, int regionID)
throws IOException, ProcessException, StorageException,
InterruptedException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.errorprone.annotations.MustBeClosed;
Expand Down Expand Up @@ -132,14 +131,14 @@ private IntBuffer getCoreRegionTable(CoreLocation core, Vertex vertex)
}

@Override
protected List<Region> getRegion(Placement placement, int regionID)
protected Region getRegion(Placement placement, int regionID)
throws IOException, ProcessException, InterruptedException {
var b = getCoreRegionTable(placement.asCoreLocation(),
placement.getVertex());
// TODO This is wrong because of shared regions!
int size = b.get(regionID + 1) - b.get(regionID);
return List.of(new Region(placement, regionID,
new MemoryLocation(b.get(regionID)), size));
return new Region(
placement, regionID, new MemoryLocation(b.get(regionID)), size);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,12 @@ private List<RecordingRegion> getRegions(Placement placement)
}

@Override
protected List<Region> getRegion(Placement placement, int index)
protected Region getRegion(Placement placement, int index)
throws IOException, ProcessException, InterruptedException {
var region = getRegions(placement).get(index);
log.debug("got region of {} R:{} as {}", placement.asCoreLocation(),
index, region);
if (region.size > 0) {
return List.of(new Region(placement, index, region.data,
(int) region.size));
}
return List.of();
return new Region(placement, index, region.data, (int) region.size);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ default int storeRegionContents(Region region, ByteBuffer contents)
void appendRecordingContents(Region region, byte[] contents)
throws StorageException;

void insertMockExtraction() throws StorageException;

/**
* Adds some bytes to the database. The bytes represent part of the contents
* of a recording region of a particular SpiNNaker core.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,23 @@ private SQL() {
static final String GET_LOCATION = "SELECT core_id FROM core"
+ " WHERE x = ? AND y = ? AND processor = ? LIMIT 1";

/** Create an empty region record. */
/** Create a region record. */
@Parameter("core_id")
@Parameter("local_region_index")
@Parameter("address")
@ResultColumn("region_id")
static final String INSERT_REGION = "INSERT INTO "
+ "region(core_id, local_region_index, address)"
+ " VALUES (?, ?, ?) RETURNING region_id";
+ "region(core_id, local_region_index)"
+ " VALUES (?, ?) RETURNING region_id";


/** For testing create an extraction record.
Would normally be done by python.
*/
@ResultColumn("extraction_id")
static final String INSERT_MOCK_EXTRACTION = "INSERT INTO "
+ "extraction(run_timestep, n_run, n_loop, extract_time) "
+ "VALUES(12345, 1, NULL, 987654) RETURNING extraction_id ";

/** Find an existing region record. */
@Parameter("core_id")
Expand All @@ -65,76 +74,31 @@ private SQL() {
static final String GET_REGION = "SELECT region_id FROM region WHERE "
+ "core_id = ? AND local_region_index = ? LIMIT 1";

/** Append content to a region record. */
@Parameter("content_to_add")
@Parameter("content_len")
@Parameter("append_time")
@Parameter("region_id")
static final String ADD_CONTENT =
"UPDATE region SET content = CAST(? AS BLOB), content_len = ?, "
+ "fetches = 1, append_time = ? WHERE region_id = ?";

/** Prepare a region record for handling content in the extra table. */
@Parameter("append_time")
@Parameter("region_id")
static final String PREP_EXTRA_CONTENT =
"UPDATE region SET fetches = fetches + 1, append_time = ? "
+ "WHERE region_id = ?";
/** Find the current extraction_id. */
@ResultColumn("max_id")
static final String GET_LAST_EXTRACTION_ID =
"SELECT max(extraction_id) as max_id "
+ "FROM extraction LIMIT 1";

/** Add content to a new row in the extra table. */
/** Create a region record. */
@Parameter("region_id")
@Parameter("extraction_id")
@Parameter("content_to_add")
@Parameter("content_len")
@ResultColumn("extra_id")
static final String ADD_EXTRA_CONTENT =
"INSERT INTO region_extra(region_id, content, content_len) "
+ "VALUES (?, CAST(? AS BLOB), ?) RETURNING extra_id";

/**
* Discover whether region in the main region table is available for storing
* data.
*/
@Parameter("region_id")
@ResultColumn("existing")
static final String GET_MAIN_CONTENT_AVAILABLE =
"SELECT COUNT(*) AS existing FROM region "
+ "WHERE region_id = ? AND fetches = 0";

/**
* Determine just how much content there is for a row, overall.
*/
@Parameter("region_id")
@ResultColumn("len")
static final String GET_CONTENT_TOTAL_LENGTH =
"SELECT r.content_len + ("
+ " SELECT SUM(x.content_len) "
+ " FROM region_extra AS x "
+ " WHERE x.region_id = r.region_id"
+ ") AS len FROM region AS r WHERE region_id = ?";

/** Fetch the current variable state of a region record. */
@Parameter("x")
@Parameter("y")
@Parameter("processor")
@Parameter("local_region_index")
@ResultColumn("content")
@ResultColumn("content_len")
@ResultColumn("fetches")
@ResultColumn("append_time")
@ResultColumn("region_id")
static final String FETCH_RECORDING =
"SELECT content, content_len, fetches, append_time, region_id "
+ "FROM region_view"
+ " WHERE x = ? AND y = ? AND processor = ?"
+ " AND local_region_index = ? LIMIT 1";
@ResultColumn("region_data_id")
static final String ADD_REGION_DATA =
"INSERT INTO region_data(region_id, extraction_id, content, "
+ "content_len, missing_data) "
+ "VALUES (?, ?, CAST(? AS BLOB), ?, 0) "
+ "RETURNING region_data_id";

/** Fetch the current variable state of a region record. */
@Parameter("region_id")
@ResultColumn("content")
@ResultColumn("content_len")
static final String FETCH_EXTRA_RECORDING =
"SELECT content, content_len FROM region_extra"
+ " WHERE region_id = ? ORDER BY extra_id ASC";
@ResultColumn("missing_data")
static final String FETCH_RECORDING =
"SELECT content, missing_data FROM region_data "
+ "WHERE region_id = ? ORDER BY extraction_id ASC";

/** List the cores with storage. */
@Parameters({})
Expand Down
Loading

0 comments on commit ec1d680

Please sign in to comment.