Skip to content

Commit

Permalink
push down into_concurrent to be exclusive to rust code
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Oct 13, 2024
1 parent c4979e5 commit 6f40597
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
20 changes: 7 additions & 13 deletions python/arrow_odbc/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ def next_batch(self):
struct_array = Array._import_from_c(array_ptr, schema_ptr)
return RecordBatch.from_struct_array(struct_array)

def into_concurrent(self):
error = lib.arrow_odbc_reader_into_concurrent(self.handle)
raise_on_error(error)

def query(
self,
connection: ConnectionRaii,
Expand Down Expand Up @@ -117,9 +113,10 @@ def bind_buffers(
max_bytes_per_batch: int,
max_text_size: int,
max_binary_size: int,
falliable_allocations: bool = False,
schema: Optional[Schema] = None,
map_schema: Optional[Callable[[Schema], Schema]] = None,
falliable_allocations: bool,
schema: Optional[Schema],
map_schema: Optional[Callable[[Schema], Schema]],
fetch_concurrently: bool
):
if map_schema is not None:
schema = map_schema(self.schema())
Expand All @@ -133,6 +130,7 @@ def bind_buffers(
max_text_size,
max_binary_size,
falliable_allocations,
fetch_concurrently,
ptr_schema,
)
# See if we managed to execute the query successfully and return an error if not
Expand Down Expand Up @@ -291,11 +289,9 @@ def more_results(
falliable_allocations=falliable_allocations,
schema=schema,
map_schema=map_schema,
fetch_concurrently=fetch_concurrently,
)

if fetch_concurrently:
self.reader.into_concurrent()

# Every result set can have its own schema, so we must update our member
self.schema = self.reader.schema()

Expand Down Expand Up @@ -480,11 +476,9 @@ def read_arrow_batches_from_odbc(
falliable_allocations=falliable_allocations,
schema=schema,
map_schema=map_schema,
fetch_concurrently=fetch_concurrently,
)

if fetch_concurrently:
reader.into_concurrent()

return BatchReader(reader)


Expand Down
6 changes: 6 additions & 0 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub unsafe extern "C" fn arrow_odbc_reader_bind_buffers(
max_text_size: usize,
max_binary_size: usize,
fallibale_allocations: bool,
fetch_concurrently: bool,
schema: *mut c_void,
) -> *mut ArrowOdbcError {
let schema = take_schema(schema);
Expand All @@ -184,6 +185,11 @@ pub unsafe extern "C" fn arrow_odbc_reader_bind_buffers(
);
// Move cursor to the next result set.
try_!(reader.as_mut().promote_to_reader(reader_builder));

if fetch_concurrently {
try_!(reader.as_mut().into_concurrent());
}

null_mut()
}

Expand Down

0 comments on commit 6f40597

Please sign in to comment.