Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dropbox] Add Unstructured file decoding to Dropbox #11

Merged
merged 4 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dropbox/.env-template
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ DROPBOX_APP_KEY=
DROPBOX_APP_SECRET=
tianjing-li marked this conversation as resolved.
Show resolved Hide resolved
DROPBOX_SEARCH_LIMIT=5
DROPBOX_PATH=
DROPBOX_CONNECTOR_API_KEY=
DROPBOX_CONNECTOR_API_KEY=

# Unstructured
DROPBOX_UNSTRUCTURED_BASE_URL=
DROPBOX_UNSTRUCTURED_API_KEY=
tianjing-li marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 11 additions & 2 deletions dropbox/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ This package is a utility for connecting Cohere to Dropbox, featuring a simple l

The Dropbox connector currently searches for all active files within your Dropbox instance. Note that new files added will require a couple minutes of indexing time to be searchable. Dropbox usually takes less than 5 minutes.

**Please note, currently, the contents of individual files is not decoded.**

## Configuration

To use the Dropbox connector, first create an app in the [Developer App Console](https://www.dropbox.com/developers/apps). Select Scoped Access, and give it the access type it needs. Note that `App folder` access will give your app access to a folder specifically created for your app, while `Full Dropbox` access will give your app access to all files and folders currently in your Dropbox instance.
Expand Down Expand Up @@ -60,6 +58,17 @@ curl -X POST \
}'
```

## Unstructured

To decode file contents, this connector leverages [Unstructured](https://unstructured.io). You can generate a free [API key here](https://unstructured.io/api-key).

It is necessary to provide the following values in the `.env`:

- `DROPBOX_UNSTRUCTURED_BASE_URL`
- `DROPBOX_UNSTRUCTURED_API_KEY`

Use the API key generated earlier. To quickstart usage, you can use the hosted `https://api.unstructured.io` as the base URL, or you can [host your own Unstructured server](https://unstructured-io.github.io/unstructured/apis/usage_methods.html).

## Development

Create a virtual environment and install dependencies with poetry. We recommend using in-project virtual environments:
Expand Down
367 changes: 366 additions & 1 deletion dropbox/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dropbox/provider/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def download_file(self, path):


def get_client(oauth_token=None):
search_limit = app.config.get("SEARCH_LIMIT", 5)
search_limit = app.config.get("SEARCH_LIMIT", 3)
path = app.config.get("PATH", "")
env_token = app.config.get("ACCESS_TOKEN", "")
token = None
Expand Down
36 changes: 33 additions & 3 deletions dropbox/provider/provider.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
from typing import Any

from .client import get_client
from .unstructured import get_unstructured_client


def search(query: str, oauth_token: str = None) -> list[dict[str, Any]]:
dbx_client = get_client(oauth_token)
unstructured_client = get_unstructured_client()
dbx_results = dbx_client.search(query)

return serialize_results(dbx_results, dbx_client, unstructured_client)


def serialize_results(dbx_results, dbx_client, unstructured_client):
results = []

for dbx_result in dbx_results.matches:
if not (metadata := dbx_result.metadata.get_metadata()):
continue
Expand All @@ -17,12 +24,35 @@ def search(query: str, oauth_token: str = None) -> list[dict[str, Any]]:

metadata, f = dbx_client.download_file(metadata.path_display)

import pdb

result = {
"type": "file",
"id": metadata.id,
"title": metadata.name,
"text": str(f.content),
"type": "file",
"raw_content": f.content,
}
# TODO: decode file contents

results.append(result)

# Group for batch Unstructured requests
files = [
(result["id"], result["title"], result["raw_content"]) for result in results
]
unstructured_content = unstructured_client.batch_get(files)

# Now build text field based off Unstructured return
for result in results:
del result["raw_content"]
walterbm-cohere marked this conversation as resolved.
Show resolved Hide resolved
file_name = result["title"]

if file_name in unstructured_content:
# Build text
text = ""
for element in unstructured_content[file_name]:
text += f' {element.get("text")}'

if text != "":
result["text"] = text

return results
117 changes: 117 additions & 0 deletions dropbox/provider/unstructured.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import sys
import asyncio
import aiohttp
import logging
import functools
from collections import OrderedDict
from flask import current_app as app

logger = logging.getLogger(__name__)

CACHE_LIMIT_BYTES = 20 * 1024 * 1024 # 20 MB to bytes
TIMEOUT_SECONDS = 20

unstructured = None


class UnstructuredRequestSession:
def __init__(self, unstructured_base_url, api_key):
self.get_content_url = f"{unstructured_base_url}/general/v0/general"
self.api_key = api_key
# Manually cache because functools.lru_cache does not support async methods
self.cache = OrderedDict()
self.start_session()

def start_session(self):
self.loop = asyncio.new_event_loop()
# Create ClientTimeout object to apply timeout for every request in the session
client_timeout = aiohttp.ClientTimeout(total=TIMEOUT_SECONDS)
self.session = aiohttp.ClientSession(loop=self.loop, timeout=client_timeout)

def close_loop(self):
self.loop.stop()
self.loop.close()

def cache_size(self):
# Calculate the total size of values in bytes
total_size_bytes = functools.reduce(
lambda a, b: a + b, map(lambda v: sys.getsizeof(v), self.cache.values()), 0
)

return total_size_bytes

def cache_get(self, key):
self.cache.move_to_end(key)

return self.cache[key]

def cache_put(self, key, item):
self.cache[key] = item

while self.cache_size() > CACHE_LIMIT_BYTES:
self.cache.popitem()

async def close_session(self):
await self.session.close()

async def get_unstructured_content(self, file):
# Unpack tuple
file_id, file_name, file_data = file

# Check cache
if file_id in self.cache:
return self.cache_get(file_id)

# Use FormData to pass in files parameter
data = aiohttp.FormData()
data.add_field("files", file_data, filename=file_name)

# API key optional if self-hosted
headers = {} if self.api_key is None else {"unstructured-api-key": self.api_key}

async with self.session.post(
self.get_content_url,
headers=headers,
data=data,
) as response:
content = await response.json()
if not response.ok:
logger.error(f"Error response from Unstructured: {content}")
return None

self.cache_put(file_id, (file_name, content))

return self.cache[file_id]

async def gather(self, files):
tasks = [self.get_unstructured_content(file) for file in files]
return await asyncio.gather(*tasks)

def batch_get(self, files):
results = self.loop.run_until_complete(self.gather(files))
results = [result for result in results if result is not None]

result_dict = {
filename: content for filename, content in results if content is not None
}

# Close session and loop
self.loop.run_until_complete(self.close_session())
self.close_loop()

return result_dict


def get_unstructured_client():
global unstructured
if unstructured is not None:
return unstructured

assert (
unstructured_base_url := app.config.get("UNSTRUCTURED_BASE_URL")
), "DROPBOX_UNSTRUCTURED_BASE_URL must be set"
api_key = app.config.get("UNSTRUCTURED_API_KEY", None)

unstructured = UnstructuredRequestSession(unstructured_base_url, api_key)

return unstructured
1 change: 1 addition & 0 deletions dropbox/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ python-dotenv = "^1.0.0"
dropbox = "^11.36.2"
requests = "^2.31.0"
gunicorn = "^21.2.0"
aiohttp = "^3.9.1"


[tool.poetry.group.development.dependencies]
Expand Down
4 changes: 1 addition & 3 deletions sharepoint/provider/unstructured.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ def batch_get(self, files):
results = [result for result in results if result is not None]

result_dict = {
filename: content[:20]
for filename, content in results
if content is not None
filename: content for filename, content in results if content is not None
}

# Close session and loop
Expand Down