-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement adaptive flush to disk during fetch stage (to avoid OOM) in all export scripts #28
Comments
I was in the process of creating an implementation plan to modify the 'vespa_export.py' and 'azureai_export.py' scripts to implement the 'adaptive flush to disk' feature. However, I ran out of time before I could finish. I will need more time to complete this task. Please let me know if you have any questions or need further clarification on anything. Implementation proposal (work in progress)Step 1: Implement adaptive flush to disk in 'vertexai_vector_search_export.py'Modify the 'vertexai_vector_search_export.py' script to implement the 'adaptive flush to disk' feature:
You can use the 'astradb_export.py' script as a reference for how to implement this feature. Step 2: Implement adaptive flush to disk in 'milvus_export.py'Modify the 'milvus_export.py' script to implement the 'adaptive flush to disk' feature:
You can use the 'astradb_export.py' script as a reference for how to implement this feature. Step 3: Implement adaptive flush to disk in 'vespa_export.py'Modify the 'vespa_export.py' script to implement the 'adaptive flush to disk' feature:
You can use the 'astradb_export.py' script as a reference for how to implement this feature. For more information about Ellipsis, check the documentation. |
To implement adaptive flush to disk during the fetch stage across all export scripts, follow these steps tailored to the structure of each script in the
import psutil
def get_data(self):
table_names = self.args['tables'].split(',')
for table_name in table_names:
data_batch = []
for record in self.session.table(table_name).query():
data_batch.append(record)
if psutil.virtual_memory().percent > 80: # Example threshold
self.flush_to_disk(data_batch, table_name)
data_batch = []
if data_batch: # Flush remaining data
self.flush_to_disk(data_batch, table_name)
def flush_to_disk(self, data_batch, table_name):
# Convert data_batch to DataFrame and save to Parquet
df = pd.DataFrame(data_batch)
df.to_parquet(f'{table_name}_{datetime.now().timestamp()}.parquet')
This solution provides a scalable way to handle large data exports without running into OOM errors, ensuring data is safely written to disk in manageable batches. Referencesai-northstar-tech/vector-io/src/vdf_io/export_vdf/astradb_export.py |
No description provided.
The text was updated successfully, but these errors were encountered: