-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CV2-4953: Storing ClassyCat data in Alegre #105
base: master
Are you sure you want to change the base?
Changes from all commits
dd569c5
f19e9b9
659fd68
87669fa
1272766
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,16 @@ def get_client(self): | |
def classify(self, task_prompt, items_count, max_tokens_per_item=200): | ||
pass | ||
|
||
|
||
class AnthropicClient(LLMClient): | ||
def __init__(self, model_name): | ||
super().__init__() | ||
self.model_name = model_name | ||
|
||
def get_client(self): | ||
if self.client is None: | ||
self.client = Anthropic(api_key=os.environ.get('ANTHROPIC_API_KEY'), timeout=httpx.Timeout(60.0, read=60.0, write=60.0, connect=60.0), max_retries=0) | ||
self.client = Anthropic(api_key=os.environ.get('ANTHROPIC_API_KEY'), | ||
timeout=httpx.Timeout(60.0, read=60.0, write=60.0, connect=60.0), max_retries=0) | ||
return self.client | ||
|
||
def classify(self, task_prompt, items_count, max_tokens_per_item=200): | ||
|
@@ -43,6 +45,7 @@ def classify(self, task_prompt, items_count, max_tokens_per_item=200): | |
|
||
return completion.content[0].text | ||
|
||
|
||
class OpenRouterClient(LLMClient): | ||
def __init__(self, model_name): | ||
super().__init__() | ||
|
@@ -65,7 +68,7 @@ def classify(self, task_prompt, items_count, max_tokens_per_item=200): | |
max_tokens=(max_tokens_per_item * items_count) + 15, | ||
temperature=0.5 | ||
) | ||
# TODO: record metric here with model name and number of items submitted (https://meedan.atlassian.net/browse/CV2-4987) | ||
# TODO: record metric here with model name and number of items submitted (https://meedan.atlassian.net/browse/CV2-4987) | ||
return completion.choices[0].message.content | ||
|
||
|
||
|
@@ -137,17 +140,33 @@ def classify_and_store_results(self, schema_id, items): | |
|
||
result['labels'] = [label for label in result['labels'] if label in permitted_labels] | ||
|
||
# if there is at least one item with labels, save the results to s3 | ||
if not all([len(result['labels']) == 0 for result in final_results]): | ||
results_file_id = str(uuid.uuid4()) | ||
upload_file_to_s3(self.output_bucket, f"{schema_id}/{results_file_id}.json", json.dumps(final_results)) | ||
|
||
return final_results | ||
# prepare the final results to be stored in alegre | ||
# save "content" and "context" | ||
# content is text, doc_id is the item's unique id, and context is input id, labels, schema_id, and model name | ||
final_results_to_be_stored_in_alegre = {'documents': [ | ||
{'doc_id': str(uuid.uuid4()), # adding a unique id for each item to not rely on the input id for uniqueness | ||
'content': items[i]['text'], | ||
'context': { | ||
'input_id': items[i]['id'], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think would would be helpful to put a `'source':'ClassyCat' so if we later want to query this objects there is a key to do it |
||
'labels': final_results[i]['labels'], | ||
'schema_id': schema_id, | ||
'model_name': self.llm_client.model_name}} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be the model name for the text vectorization? i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Within There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch Scott, I did not know that we should specify model_name, and on local the documents are being index only by Elasticsearch (full text search). I will update the code and redo the tests to make sure vectorization works on local. |
||
for i in range(len(items))]} | ||
|
||
# call alegre endpoint to store the results: /text/bulk_similarity/ | ||
alegre_url = os.getenv('ALEGRE_URL') | ||
httpx.post(alegre_url + '/text/bulk_similarity/', json=final_results_to_be_stored_in_alegre) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooh, does this insert endpoint work? maybe I can switch timpani to use it as well when we convert to batch mode! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We think so, but @DGaffney is currently moving all vectorization to Presto and will need to ensure this endpoint continues to work after that migration 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense Scott, I added a comment on Devin's ticket to make sure he sees it. @skyemeedan as of this moment the endpoint works fine locally for me, feel free to test it out! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if we want to support this after we move to presto vectorization. This is a blocking, bulk query which breaks expectations in Presto-based vectorization doubly (not-blocking, and single query per item). This is, to my knowledge, the only location in Check (minus the re-index job in Check-API) that uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it is reasonable to take on making async endpoints default bulk until after we have made text work on Presto. We've shunted in secondary features to this refactor before and the net effect is that they have greatly complicated our existing migration plan. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If supporting bulk requires some refactoring of presto payload structures, wouldn't it be easier to do that before we are supporting the full text processing in live? |
||
|
||
return final_results | ||
|
||
def schema_id_exists(self, schema_id): | ||
return file_exists_in_s3(self.output_bucket, f"{schema_id}.json") | ||
|
||
|
||
def process(self, message: Message) -> ClassyCatBatchClassificationResponse: | ||
# Example input: | ||
# { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the
/text/similarity
endpoint this parameter istext
and notcontent
. Let's double check this is the correct name for the bulk endpointThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for /text/bulk_similarity/ we replace
text
withcontent
:We can use both
content
andtext
for bulk but it ultimately gets renamed tocontent
in Alegre.