Skip to content

Commit

Permalink
Use PipedIn/Out to reduce heap mem load
Browse files Browse the repository at this point in the history
  • Loading branch information
samleeflang committed Jan 26, 2024
1 parent 8303e15 commit 20bd9f6
Showing 1 changed file with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import com.fasterxml.jackson.databind.JsonNode;
import eu.dissco.core.translator.exception.DisscoRepositoryException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.List;
Expand All @@ -21,22 +22,6 @@ public class BatchInserter {

private final CopyManager copyManager;

public void batchCopy(String tableName, List<Pair<String, JsonNode>> dbRecords)
throws DisscoRepositoryException {
try (var outputStream = new ByteArrayOutputStream()) {
for (var dbRecord : dbRecords) {
outputStream.write(getCsvRow(dbRecord));
}
var inputStream = new ByteArrayInputStream(outputStream.toByteArray());
copyManager.copyIn("COPY " + tableName
+ " FROM stdin DELIMITER ','", inputStream);
} catch (IOException | SQLException e) {
throw new DisscoRepositoryException(
String.format("An error has occurred inserting %d records into temp table %s",
dbRecords.size(), tableName), e);
}
}

private static byte[] getCsvRow(Pair<String, JsonNode> dbRecord) {
return (dbRecord.getLeft() + "," +
cleanString(dbRecord.getRight())
Expand All @@ -54,4 +39,31 @@ private static String cleanString(JsonNode jsonNode) {
return node;
}

public void batchCopy(String tableName, List<Pair<String, JsonNode>> dbRecords)
throws DisscoRepositoryException {
try (var outputStream = new ByteArrayOutputStream();
var in = new PipedInputStream();
var out = new PipedOutputStream(in)) {
for (var dbRecord : dbRecords) {
outputStream.write(getCsvRow(dbRecord));
}
try (in) {
new Thread(() -> {
try (out) {
outputStream.writeTo(out);
} catch (IOException e) {
log.error("Error writing to pipe", e);
}
}).start();

copyManager.copyIn("COPY " + tableName
+ " FROM stdin DELIMITER ','", in);
}
} catch (IOException | SQLException e) {
throw new DisscoRepositoryException(
String.format("An error has occurred inserting %d records into temp table %s",
dbRecords.size(), tableName), e);
}
}

}

0 comments on commit 20bd9f6

Please sign in to comment.