Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit Database Query to Configurable Number of Rows #971

Open
philipschm1tt opened this issue Nov 25, 2020 · 13 comments
Open

Limit Database Query to Configurable Number of Rows #971

philipschm1tt opened this issue Nov 25, 2020 · 13 comments

Comments

@philipschm1tt
Copy link

Idea
Add a configurable limit to a number of rows to the database query.

Context
We use the JDBC source connector on a large database table with a million new rows per day in mode timestamp+incrementing.
We have had outages where the JDBC source connector fell multiple days behind and then had to be restarted.
When trying to catch up, the database query became too expensive – it takes so long that the connector does not manage to catch up.

Our workaround:
We set the timestamp.delay.interval.ms multiple days in the past so that the query returns fewer rows and the connector processes them quickly.
Then we reduce the timestamp.delay.interval.ms step by step to allow the connector to catch up.

Ideally, we would like to limit the connector to query only a certain number of rows – for example 100k at a time.
Then the querys would stay fast even when the connector has a large lag to the database table.
Then we would not need to adjust the timestamp.delay.interval.ms.

Since the connector adds the WHERE...ORDER BY... itself, we cannot directly add the limit in the query.

What do you think?

@cannontrodder
Copy link

If you add '--' to the end of your query, you can comment out the WHERE and ORDER BY which the connector appends and then include them in the query yourself. That way you get complete control over the whole query. You can add the LIMIT wherever you want then. In our case, we were able to add a TOP and move the WHERE inside a subquery.

@philipschm1tt
Copy link
Author

Thanks, @cannontrodder – I did not think of that :)

It might still be a nice feature.

@cannontrodder
Copy link

Thanks, @cannontrodder – I did not think of that :)

It might still be a nice feature.

I'd actually like it if we could provide/implement the query for the connector instead of have it append SQL snippets to our queries. My suggestion is technically a SQL injection hack!!!

@cmrenus
Copy link

cmrenus commented Feb 2, 2021

The only problem with that, how are you managing the offset? You will miss records of you are not managing limit and offset in this scenario correct? Or is that not of concern?

@ussama-rafique
Copy link

ussama-rafique commented Feb 25, 2021

I think it is a much needed feature. It can be implemented by having a property like "limit_clause" (with "top", "limit", "fetch first", etc. as possible values). And JDBC connector should put the limit/top clause at appropriate place in query.

@simpleusr
Copy link

Hi @maeh2k , @cannontrodder , @cmrenus , @ussama-rafique , @rhauch

We are having the same problem against oracle database but the observed behavior is pretty much the same.

@maeh2k , your proposal is actually what we are applying for the workaround. But we have numerous jobs and every config change is causing a rebalance which makes things a lot harder to manage...

As @cmrenus indicated, I also think that hacking the sql that way would lead to offset confusion. Indeed, a similar solution proposed for the same problem in this so thread , but I believe it will fail because of the same reason.

@rhauch , this seems to be common and needed feature , what do you think? Do you have any suggestions?

If this somehow gets implemented, the only possible problem that comes to my mind is: How connect should handle the situation when the actual number of records for a single incrementing criteria instance is greater than the configured max number of rows?

To be more explicit, assume that timestamp incrementing criteria is used and there are 100 records in db with the exact same timestamp value. If the configured max number of rows is 10, how should connect increment the offset after polling 10 records? Simply advancing the offset would lead to missing of the remaining 90 records ? But in order to be able to consume them in the next poll, another offset column would be required since the records are not separable by timestamp offset column?
Or this situation should be documented as a known/expected limitation?

Best regards

@philipschm1tt
Copy link
Author

@simpleusr we have recently switched to @cannontrodder's "SQL injection workaround".

I have not checked the code but I would assume that this still will not miss records if you inject the limit correctly (in Oracle with a nested SELECT).

If the connector correctly stores the new offset based on the latest database row it processed, it would still have the same timestamp as the 100 records. But it would set the offset of the 10th record. Then the next query would use that timestamp and offset and still return the remaining 90 records.

@simpleusr
Copy link

@maeh2k

Could you please provide a concrete example since I still could not get it (may be you can share the actual query obfuscurating the table and column names and simplifying if possible)?

Without any code change I think you implemented sth like the proposed so answer I mentioned about

If so how could you pass the offset to the inner select?

I also could not understand the second part either.

But it would set the offset of the 10th record. Then the next query would use that timestamp and offset and still return the remaining 90 record

When we are using the timestamp mode and all the 100 records have the same exact time stamp x , offset in connect will be set to x after consuming these 10 right? How would the next query proceed with the 90 records with timestamp value of x?
I assume the next query should be wrapped like : "#query from task config# where timestamp_column>x" and this will be missing them?

Regards

@philipschm1tt
Copy link
Author

philipschm1tt commented Mar 2, 2021

For Oracle, the query would look like this:

"SELECT * FROM
              (SELECT * FROM
                             (SELECT TIMESTAMP, INCREMENTING, SOME_VALUE FROM SOMETABLE)
              WHERE TIMESTAMP < ? AND ((TIMESTAMP = ? AND INCREMENTING > ?) OR CREATED_TIMESTAMP > ?)
              ORDER BY TIMESTAMP ASC, INCREMENTING ASC)
WHERE ROWNUM < 10000 --"

That is, you take the WHERE condition from the timestamp+incrementing configuration.
You wrap it with another select to limit the number of rows in the Oracle style.
The implementation provides the query parameters.
The -- at the end comments out the orgiginal WHERE condition.

@simpleusr
Copy link

Hi @maeh2k

Many thanks for the collaboration. Stupid me, I was missing that "--" at the end which comments out the appended where clause by connect...

To put it another way: per my understanding , what you achieve is plugging the timestampIncrementingWhereClause logic :

 protected void timestampIncrementingWhereClause(ExpressionBuilder builder) {
    // This version combines two possible conditions. The first checks timestamp == last
    // timestamp and incrementing > last incrementing. The timestamp alone would include
    // duplicates, but adding the incrementing condition ensures no duplicates, e.g. you would
    // get only the row with id = 23:
    //  timestamp 1234, id 22 <- last
    //  timestamp 1234, id 23
    // The second check only uses the timestamp >= last timestamp. This covers everything new,
    // even if it is an update of the existing row. If we previously had:
    //  timestamp 1234, id 22 <- last
    // and then these rows were written:
    //  timestamp 1235, id 22
    //  timestamp 1236, id 23
    // We should capture both id = 22 (an update) and id = 23 (a new row)
    builder.append(" WHERE ");
    coalesceTimestampColumns(builder);
    builder.append(" < ? AND ((");
    coalesceTimestampColumns(builder);
    builder.append(" = ? AND ");
    builder.append(incrementingColumn);
    builder.append(" > ?");
    builder.append(") OR ");
    coalesceTimestampColumns(builder);
    builder.append(" > ?)");
    builder.append(" ORDER BY ");
    coalesceTimestampColumns(builder);
    builder.append(",");
    builder.append(incrementingColumn);
    builder.append(" ASC");
  }

to your row limited query, without breaking connect query semantics and this I believe is a very clever workaround solution.

For my case, I am using mode timestamp (as opposed to yours of timestamp+incrementing)

So I had to wrap my query as below:

 SELECT * FROM
              (SELECT * FROM
                             (SELECT TIMESTAMP, SOME_VALUE FROM SOMETABLE)
              WHERE TIMESTAMP > ? AND TIMESTAMP < ? ORDER BY TIMESTAMP ASC )
WHERE ROWNUM < 10000

to plug in the timestampWhereClause logic :

 protected void timestampWhereClause(ExpressionBuilder builder) {
    builder.append(" WHERE ");
    coalesceTimestampColumns(builder);
    builder.append(" > ? AND ");
    coalesceTimestampColumns(builder);
    builder.append(" < ? ORDER BY ");
    coalesceTimestampColumns(builder);
    builder.append(" ASC");
  }

And it seems to work as expected in test environment. Many thanks again...

But unfortunately , skipping records may occur for my case if number of rows having the exact same timestamp value is greater than the provided rownum value.

You actually have two offset columns timestamp and incrementing. For my case, unfortunately, there is no auto increment like column to be used as an incrementing column, just the timestamp column. Timestamp + incrementing combination can uniquely identify a row from table for your case , but not for my case since I only have a single column.

But anyway , given the situation, this seems to be the best option. So many thanks again:)

Lastly , I want to have your thoughts on query timeout. I opened an issue but did not receive any comments/feedback until now. What do you think about that? Did you make any customizations for applying timeout?

Best regards

@crutis
Copy link

crutis commented Jan 13, 2022

So why isn't batch.max.rows just added as a limit statement to the generated query? (Up until I hit this error yesterday, that was my assumption)
java.sql.SQLTransientConnectionException: (conn=8590895) Error writing file '/rdsdbdata/tmp/MYy37HKO' (Errcode: 28 - No space left on device)
Apparently if you query against a very large table, even if you only want 10k rows at a time it's going to kill your server because it attempts to write all the data to temp space.

@luxpermanet
Copy link

I don't have an oracle setup but how about using the "query.suffix" configuration parameter.

https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html.

The documentation claims that this is the "suffix to append at the end of the generated query". Could this be an option?

For example: "query.suffix": " FETCH FIRST 10 ROWS ONLY".

@cannontrodder
Copy link

@luxpermanet it can work, sometimes the query you need to use to pull from the table requires some nesting of subqueries and in order to force a particular execution plan, the LIMIT or TOP directive needs to be in a subquery with the connect applied WHERE clause needing to be outside it. The above solution from me helps in that case. What would be better is just being able to completely replace the query used when there is an offset in connects topics with our own.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants