Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE!] SNOW-1176557 Support max lob size to 128MB in client SDK #877

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.example;


import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import net.snowflake.client.jdbc.internal.apache.tika.utils.StringUtils;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;

/**
* Example on how to use the Streaming Ingest client APIs.
*
* <p>Please read the README.md file for detailed steps
*/
public class SnowflakeStreamingIngestLargeLobExample {
// Please follow the example in profile_streaming.json.example to see the required properties, or
// if you have already set up profile.json with Snowpipe before, all you need is to add the "role"
// property. If the "role" is not specified, the default user role will be applied.
private static String PROFILE_PATH = "profile.json";
private static final ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) throws Exception {
Properties props = new Properties();
Iterator<Map.Entry<String, JsonNode>> propIt =
mapper.readTree(new String(Files.readAllBytes(Paths.get(PROFILE_PATH)))).fields();
while (propIt.hasNext()) {
Map.Entry<String, JsonNode> prop = propIt.next();
props.put(prop.getKey(), prop.getValue().asText());
}

// Create a streaming ingest client
try (SnowflakeStreamingIngestClient client =
SnowflakeStreamingIngestClientFactory.builder("MY_CLIENT").setProperties(props).build()) {

// Create an open channel request on table MY_TABLE, note that the corresponding
// db/schema/table needs to be present
// Example: create or replace table MY_TABLE(c1 number);
OpenChannelRequest request1 =
OpenChannelRequest.builder("MY_CHANNEL")
.setDBName("MY_DATABASE")
.setSchemaName("MY_SCHEMA")
.setTableName("MY_TABLE")
.setOnErrorOption(
OpenChannelRequest.OnErrorOption.CONTINUE) // Another ON_ERROR option is ABORT
.build();

// Open a streaming ingest channel from the given client
SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

// Insert rows into the channel (Using insertRows API)
final int totalRowsInTable = 10;
for (int val = 0; val < totalRowsInTable; val++) {
Map<String, Object> row = new HashMap<>();

// large columns corresponds to the column name in table
row.put("c1", StringUtils.repeat("a", 127 * 1024 * 1024) + val);
row.put("c2", new byte[60 * 1024 * 1024]);
row.put("c3", "{\"a\":\"" + StringUtils.repeat("a", 127 * 1024 * 1024) + "\"}");
row.put("c4", "{\"a\":\"" + StringUtils.repeat("a", 127 * 1024 * 1024) + "\"}");

// Insert the row with the current offset_token
InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
if (response.hasErrors()) {
// Simply throw if there is an exception, or you can do whatever you want with the
// erroneous row
throw response.getInsertErrors().get(0).getException();
}
}

// If needed, you can check the offset_token registered in Snowflake to make sure everything
// is committed
final int expectedOffsetTokenInSnowflake = totalRowsInTable - 1; // 0 based offset_token
final int maxRetries = 10;
int retryCount = 0;

do {
String offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();
if (offsetTokenFromSnowflake != null
&& offsetTokenFromSnowflake.equals(String.valueOf(expectedOffsetTokenInSnowflake))) {
System.out.println("SUCCESSFULLY inserted " + totalRowsInTable + " rows");
break;
}
retryCount++;
} while (retryCount < maxRetries);

// Close the channel, the function internally will make sure everything is committed (or throw
// an exception if there is any issue)
channel1.close().get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
Expand Down Expand Up @@ -69,17 +70,23 @@ class DataValidationUtil {
*/
private static final long MICROSECONDS_LIMIT_FOR_EPOCH = SECONDS_LIMIT_FOR_EPOCH * 1000000L;

public static final int BYTES_8_MB = 8 * 1024 * 1024;
public static final int BYTES_16_MB = 2 * BYTES_8_MB;
public static final int BYTES_64_MB = 64 * 1024 * 1024;
public static final int BYTES_128_MB = 2 * BYTES_64_MB;

// TODO SNOW-664249: There is a few-byte mismatch between the value sent by the user and its
// server-side representation. Validation leaves a small buffer for this difference.
static final int MAX_SEMI_STRUCTURED_LENGTH = BYTES_16_MB - 64;
static final int MAX_SEMI_STRUCTURED_LENGTH = BYTES_128_MB - 64;

private static final ObjectMapper objectMapper = new ObjectMapper();

private static final JsonFactory factory = new JsonFactory();

// set the max length to 128 MB
static {
factory.setStreamReadConstraints(
StreamReadConstraints.builder().maxStringLength(BYTES_128_MB).build());
}

// The version of Jackson we are using does not support serialization of date objects from the
// java.time package. Here we define a module with custom java.time serializers. Additionally, we
// define custom serializer for byte[] because the Jackson default is to serialize it as
Expand Down Expand Up @@ -670,13 +677,14 @@ static String validateAndParseString(
}
byte[] utf8Bytes = output.getBytes(StandardCharsets.UTF_8);

// Strings can never be larger than 16MB
if (utf8Bytes.length > BYTES_16_MB) {
// Strings can never be larger than 128MB
if (utf8Bytes.length > BYTES_128_MB) {
throw valueFormatNotAllowedException(
columnName,
"STRING",
String.format(
"String too long: length=%d bytes maxLength=%d bytes", utf8Bytes.length, BYTES_16_MB),
"String too long: length=%d bytes maxLength=%d bytes",
utf8Bytes.length, BYTES_128_MB),
insertRowIndex);
}

Expand Down Expand Up @@ -815,7 +823,7 @@ static byte[] validateAndParseBinary(
insertRowIndex);
}

int maxLength = maxLengthOptional.orElse(BYTES_8_MB);
int maxLength = maxLengthOptional.orElse(BYTES_64_MB);
if (output.length > maxLength) {
throw valueFormatNotAllowedException(
columnName,
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public class Constants {
public static final long EP_NDV_UNKNOWN = -1L;
public static final long EP_NV_UNKNOWN = -1L;
public static final int MAX_OAUTH_REFRESH_TOKEN_RETRY = 3;
public static final int BINARY_COLUMN_MAX_SIZE = 8 * 1024 * 1024;
public static final int VARCHAR_COLUMN_MAX_SIZE = 16 * 1024 * 1024;
public static final int BINARY_COLUMN_MAX_SIZE = 64 * 1024 * 1024;
public static final int VARCHAR_COLUMN_MAX_SIZE = 128 * 1024 * 1024;

// Channel level constants
public static final String CHANNEL_STATUS_ENDPOINT = "/v1/streaming/channels/status/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ private void setParameterMap(
props,
false /* enforceDefault */);

this.checkAndUpdate(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-tzhang do you know why in the past we have the parameter here, but the customer can not set it? Is there any explicit reason or just missing implementation? Thanks!

MAX_ALLOWED_ROW_SIZE_IN_BYTES,
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT,
parameterOverrides,
props,
false /* enforceDefault */);

this.checkAndUpdate(
MAX_CLIENT_LAG,
isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import static java.time.ZoneOffset.UTC;
import static net.snowflake.ingest.TestUtils.buildString;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.BYTES_16_MB;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.BYTES_8_MB;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.BYTES_128_MB;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.BYTES_64_MB;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.isAllowedSemiStructuredType;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseArray;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseArrayNew;
Expand Down Expand Up @@ -451,11 +451,11 @@ public void testValidateAndParseString() {
assertEquals("honk", validateAndParseString("COL", "honk", Optional.empty(), 0));

// Check max byte length
String maxString = buildString("a", BYTES_16_MB);
String maxString = buildString("a", BYTES_128_MB);
assertEquals(maxString, validateAndParseString("COL", maxString, Optional.empty(), 0));

// max byte length - 1 should also succeed
String maxStringMinusOne = buildString("a", BYTES_16_MB - 1);
String maxStringMinusOne = buildString("a", BYTES_128_MB - 1);
assertEquals(
maxStringMinusOne, validateAndParseString("COL", maxStringMinusOne, Optional.empty(), 0));

Expand Down Expand Up @@ -763,7 +763,7 @@ public void testValidateAndParseObject() throws Exception {

final String tooLargeObject =
objectMapper.writeValueAsString(
Collections.singletonMap("key", StringUtils.repeat('a', 20000000)));
Collections.singletonMap("key", StringUtils.repeat('a', 128 * 1024 * 1024)));
expectError(
ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", tooLargeObject, 0));
expectError(
Expand Down Expand Up @@ -858,7 +858,7 @@ public void testValidateAndParseObject() throws Exception {

@Test
public void testTooLargeVariant() {
char[] stringContent = new char[16 * 1024 * 1024 - 16]; // {"a":"11","b":""}
char[] stringContent = new char[128 * 1024 * 1024 - 16]; // {"a":"11","b":""}
Arrays.fill(stringContent, 'c');

// {"a":"11","b":""}
Expand All @@ -873,7 +873,7 @@ public void testTooLargeVariant() {
@Test
public void testTooLargeMultiByteSemiStructuredValues() {
// Variant max size is not in characters, but in bytes
char[] stringContent = new char[9 * 1024 * 1024]; // 8MB < value < 16MB
char[] stringContent = new char[90 * 1024 * 1024]; // 64MB < value < 128MB
Arrays.fill(stringContent, 'Č');

Map<String, Object> m = new HashMap<>();
Expand All @@ -882,19 +882,19 @@ public void testTooLargeMultiByteSemiStructuredValues() {
ErrorCode.INVALID_VALUE_ROW,
"The given row cannot be converted to the internal format due to invalid value: Value"
+ " cannot be ingested into Snowflake column COL of type VARIANT, rowIndex:0, reason:"
+ " Variant too long: length=18874376 maxLength=16777152",
+ " Variant too long: length=188743688 maxLength=134217664",
() -> validateAndParseVariant("COL", m, 0));
expectErrorCodeAndMessage(
ErrorCode.INVALID_VALUE_ROW,
"The given row cannot be converted to the internal format due to invalid value: Value"
+ " cannot be ingested into Snowflake column COL of type ARRAY, rowIndex:0, reason:"
+ " Array too large. length=18874378 maxLength=16777152",
+ " Array too large. length=188743690 maxLength=134217664",
() -> validateAndParseArray("COL", m, 0));
expectErrorCodeAndMessage(
ErrorCode.INVALID_VALUE_ROW,
"The given row cannot be converted to the internal format due to invalid value: Value"
+ " cannot be ingested into Snowflake column COL of type OBJECT, rowIndex:0, reason:"
+ " Object too large. length=18874376 maxLength=16777152",
+ " Object too large. length=188743688 maxLength=134217664",
() -> validateAndParseObject("COL", m, 0));
}

Expand Down Expand Up @@ -1056,8 +1056,8 @@ public void testValidVariantType() {

@Test
public void testValidateAndParseBinary() throws DecoderException {
byte[] maxAllowedArray = new byte[BYTES_8_MB];
byte[] maxAllowedArrayMinusOne = new byte[BYTES_8_MB - 1];
byte[] maxAllowedArray = new byte[BYTES_64_MB];
byte[] maxAllowedArrayMinusOne = new byte[BYTES_64_MB - 1];

assertArrayEquals(
"honk".getBytes(StandardCharsets.UTF_8),
Expand Down Expand Up @@ -1094,7 +1094,7 @@ public void testValidateAndParseBinary() throws DecoderException {
() -> validateAndParseBinary("COL", new byte[1], Optional.of(0), 0));
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", new byte[BYTES_8_MB + 1], Optional.empty(), 0));
() -> validateAndParseBinary("COL", new byte[BYTES_64_MB + 1], Optional.empty(), 0));
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", new byte[8], Optional.of(7), 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import static java.time.ZoneOffset.UTC;
import static net.snowflake.ingest.streaming.internal.DataValidationUtil.BYTES_128_MB;
import static net.snowflake.ingest.utils.Constants.EP_NV_UNKNOWN;
import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT;
Expand Down Expand Up @@ -395,15 +396,15 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer<?> row
rows.add(row);

row = new HashMap<>();
row.put("colChar", StringUtils.repeat('1', 16777217)); // too big
row.put("colChar", StringUtils.repeat('1', BYTES_128_MB + 1)); // too big
rows.add(row);

row = new HashMap<>();
row.put("colInt", 3);
rows.add(row);

row = new HashMap<>();
row.put("colChar", StringUtils.repeat('1', 16777217)); // too big
row.put("colChar", StringUtils.repeat('1', BYTES_128_MB + 1)); // too big
rows.add(row);

InsertValidationResponse response = rowBuffer.insertRows(rows, null, null);
Expand Down Expand Up @@ -435,8 +436,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer<?> row
.equalsIgnoreCase(
"The given row cannot be converted to the internal format due to invalid value:"
+ " Value cannot be ingested into Snowflake column COLCHAR of type STRING,"
+ " rowIndex:1, reason: String too long: length=16777217 bytes"
+ " maxLength=16777216 bytes"));
+ " rowIndex:1, reason: String too long: length=134217729 bytes"
+ " maxLength=134217728 bytes"));
Assert.assertTrue(
response
.getInsertErrors()
Expand All @@ -446,8 +447,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer<?> row
.equalsIgnoreCase(
"The given row cannot be converted to the internal format due to invalid value:"
+ " Value cannot be ingested into Snowflake column COLCHAR of type STRING,"
+ " rowIndex:3, reason: String too long: length=16777217 bytes"
+ " maxLength=16777216 bytes"));
+ " rowIndex:3, reason: String too long: length=134217729 bytes"
+ " maxLength=134217728 bytes"));
}

private void testStringLengthHelper(AbstractRowBuffer<?> rowBuffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@

@RunWith(Parameterized.class)
public abstract class AbstractDataTypeTest {
protected static final int MB_64 = 64 * 1024 * 1024;
protected static final int MB_128 = 128 * 1024 * 1024;

private static final String SOURCE_COLUMN_NAME = "source";
private static final String VALUE_COLUMN_NAME = "value";

Expand Down Expand Up @@ -101,6 +104,15 @@ protected void setUp(
conn.createStatement().execute(String.format("use database %s;", databaseName));
conn.createStatement().execute(String.format("use schema %s;", schemaName));

// setup for the large lob size parameters
conn.createStatement()
.execute(
"alter session set FEATURE_INCREASED_MAX_LOB_SIZE_IN_MEMORY = enabled,"
+ " FEATURE_INCREASED_MAX_LOB_SIZE_PERSISTED=enabled,"
+ " ENABLE_ISMAXLENGTH_FIELD_IN_DATATYPE = true,"
+ " ENABLE_DEFAULT_VARCHAR_AND_BINARY_LENGTH=true, \n"
+ "DEFAULT_VARCHAR_LENGTH= 134217728;");

if (isIceberg) {
switch (serializationPolicy) {
case COMPATIBLE:
Expand Down Expand Up @@ -131,6 +143,7 @@ protected void setUp(
// Override Iceberg mode client lag to 1 second for faster test execution
Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 1000L);
parameterMap.put(ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES, 4L * MB_128);

Properties prop = Utils.createProperties(props);
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testBinaryComparison() throws Exception {

@Test
public void testMaxBinary() throws Exception {
byte[] arr = new byte[8 * 1024 * 1024];
byte[] arr = new byte[MB_64];
testJdbcTypeCompatibility("BINARY", arr, new ByteArrayProvider());
}
}
Loading