Skip to content

Commit

Permalink
started refactoring all tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
pitneitemeier committed Feb 21, 2024
1 parent 527b3b7 commit fa10781
Show file tree
Hide file tree
Showing 35 changed files with 283 additions and 1,442 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import lru_cache
from os import getenv
from threading import Semaphore
from typing import Any, Mapping, Optional, Protocol, Sequence
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
self._concurrency_limit_semaphore = Semaphore(max_concurrency)

@classmethod
@lru_cache(maxsize=1)
def from_token(
cls, token: Optional[str] = None, host: str = "https://api.aleph-alpha.com"
) -> "LimitedConcurrencyClient":
Expand Down
3 changes: 0 additions & 3 deletions src/intelligence_layer/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
from .complete import Complete as Complete
from .complete import CompleteInput as CompleteInput
from .complete import CompleteOutput as CompleteOutput
from .complete import Instruct as Instruct
from .complete import InstructInput as InstructInput
from .complete import PromptOutput as PromptOutput
from .detect_language import DetectLanguage as DetectLanguage
from .detect_language import DetectLanguageInput as DetectLanguageInput
from .detect_language import DetectLanguageOutput as DetectLanguageOutput
Expand Down
15 changes: 6 additions & 9 deletions src/intelligence_layer/core/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from intelligence_layer.connectors.limited_concurrency_client import (
AlephAlphaClientProtocol,
)
from intelligence_layer.core.model import AlephAlphaModel
from intelligence_layer.core.task import Task
from intelligence_layer.core.tracer import TaskSpan

Expand Down Expand Up @@ -52,12 +53,9 @@ class ChunkTask(Task[ChunkInput, ChunkOutput]):
max_tokens_per_chunk: The maximum number of tokens to fit into one chunk.
"""

def __init__(
self, client: AlephAlphaClientProtocol, model: str, max_tokens_per_chunk: int
):
def __init__(self, model: AlephAlphaModel, max_tokens_per_chunk: int):
super().__init__()
tokenizer = client.tokenizer(model)
self._splitter = HuggingFaceTextSplitter(tokenizer)
self._splitter = HuggingFaceTextSplitter(model.get_tokenizer())
self._max_tokens_per_chunk = max_tokens_per_chunk

def do_run(self, input: ChunkInput, task_span: TaskSpan) -> ChunkOutput:
Expand All @@ -84,8 +82,7 @@ class ChunkOverlapTask(Task[ChunkInput, ChunkOutput]):

def __init__(
self,
client: AlephAlphaClientProtocol,
model: str,
model: AlephAlphaModel,
max_tokens_per_chunk: int,
overlap_length_tokens: int,
):
Expand All @@ -96,8 +93,8 @@ def __init__(
overlap_length_tokens, max_tokens_per_chunk
)
)
self.chunk_task = ChunkTask(client, model, overlap_length_tokens // 2)
self.tokenizer = client.tokenizer(model)
self.chunk_task = ChunkTask(model, overlap_length_tokens // 2)
self.tokenizer = model.get_tokenizer()
self.max_tokens_per_chunk = max_tokens_per_chunk
self.overlap_length_tokens = overlap_length_tokens

Expand Down
241 changes: 0 additions & 241 deletions src/intelligence_layer/core/complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,244 +64,3 @@ def do_run(self, input: CompleteInput, task_span: TaskSpan) -> CompleteOutput:
model=input.model,
)
return CompleteOutput(response=response)


class InstructInput(BaseModel):
"""The input for an `Instruct`.
Attributes:
instruction: A textual instruction for the model.
Could be a directive to answer a question or to translate something.
input: The text input for the instruction, e.g. a text to be translated.
response_prefix: A string that is provided to the LLM as a prefix of the response.
This can steer the model completion.
maximum_response_tokens: The maximum number of tokens to be generated in the answer.
The default corresponds to roughly one short paragraph.
"""

instruction: str
input: Optional[str]
response_prefix: str = ""
maximum_response_tokens: int = 64


class PromptOutput(CompleteOutput):
"""The output of an `Instruct` or `FewShot` task.
Attributes:
response: The generated response to the instruction.
rich_prompt: To handle the instruction, a `PromptTemplate` is used.
The template defines two `PromptRange` instances:
- "instruction": covering the instruction text.
- "input": covering the input text.
These can for example be used for downstream `TextHighlight` tasks.
"""

rich_prompt: RichPrompt


class Instruct(Task[InstructInput, PromptOutput]):
"""Runs zero-shot instruction completions on a model.
Can be used for various types of instructions a LLM could handle, like QA, summarization,
translation and more.
Args:
client: Aleph Alpha client instance for running model related API calls.
model: The name of the model that should handle the instruction.
Certain models are optimized for handling such instruction tasks.
Typically their name contains 'control', e.g. 'luminous-extended-control'.
Attributes:
INSTRUCTION_PROMPT_TEMPLATE: The prompt-template used to build the actual `Prompt` sent
to the inference API.
Example:
>>> import os
>>> from intelligence_layer.connectors import LimitedConcurrencyClient
>>> from intelligence_layer.core import InMemoryTracer, Instruct, InstructInput
>>> client = LimitedConcurrencyClient.from_token(os.getenv("AA_TOKEN"))
>>> task = Instruct(client, model="luminous-base-control")
>>> input = InstructInput(
... instruction="Translate the following to text to German.",
... input="An apple a day keeps the doctor away.",
... )
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
"""

INSTRUCTION_PROMPT_TEMPLATE = """### Instruction:
{% promptrange instruction %}{{instruction}}{% endpromptrange %}
{% if input %}
### Input:
{% promptrange input %}{{input}}{% endpromptrange %}
{% endif %}
### Response:{{response_prefix}}"""

def __init__(self, client: AlephAlphaClientProtocol, model: str) -> None:
super().__init__()
self._client = client
self._completion = Complete(client)
self._model = model

def do_run(self, input: InstructInput, task_span: TaskSpan) -> PromptOutput:
prompt = PromptTemplate(self.INSTRUCTION_PROMPT_TEMPLATE).to_rich_prompt(
input=input.input,
instruction=input.instruction,
response_prefix=input.response_prefix,
)
response = self._complete(
prompt,
input.maximum_response_tokens,
self._model,
task_span,
)
return PromptOutput(response=response, rich_prompt=prompt)

def _complete(
self, prompt: Prompt, maximum_tokens: int, model: str, task_span: TaskSpan
) -> CompletionResponse:
request = CompletionRequest(prompt, maximum_tokens=maximum_tokens)
return self._completion.run(
CompleteInput(request=request, model=model),
task_span,
).response


class FewShotExample(BaseModel):
input: str
response: str


class FewShotConfig(BaseModel):
"""Config for a few-shot prompt without dynamic input.
Attributes:
instruction: A textual instruction for the model.
Could be a directive to answer a question or to translate something.
examples: A number of few shot examples to prime the model.
input_prefix: The prefix for each `FewShotExample.input` as well as the final input.
response_prefix: The prefix for each `FewShotExample.response` as well as the completion.
"""

instruction: str
examples: Sequence[FewShotExample]
input_prefix: str
response_prefix: str
additional_stop_sequences: Sequence[str] = Field(default_factory=list)


class FewShotInput(BaseModel):
"""The input for a `FewShot` task.
Attributes:
few_shot_config: The configuration to be used for generating a response.
input: The text input for the prompt, e.g. a text to be translated.
maximum_response_tokens: The maximum number of tokens to be generated in the answer.
The default corresponds to roughly one short paragraph.
"""

few_shot_config: FewShotConfig
input: str
maximum_response_tokens: int = 64


class FewShot(Task[FewShotInput, PromptOutput]):
"""Runs few-shot completions on a model.
Vanilla models work best with a show-don't-tell approach. Few-shot prompts illustrate
the output that is expected from the model.
Args:
client: Aleph Alpha client instance for running model related API calls.
model: The name of the model that should handle the prompt.
Vanilla models work best with few-shot promptung.
These include 'luminous-base', 'extended' & 'supreme'.
Attributes:
FEW_SHOT_PROMPT_TEMPLATE: The prompt-template used to build the actual `Prompt` sent
to the inference API.
Example:
>>> import os
>>> from intelligence_layer.connectors import LimitedConcurrencyClient
>>> from intelligence_layer.core import (
... FewShot,
... FewShotConfig,
... FewShotExample,
... FewShotInput,
... InMemoryTracer,
... )
>>> client = LimitedConcurrencyClient.from_token(os.getenv("AA_TOKEN"))
>>> task = FewShot(client)
>>> input = FewShotInput(
... input="What is the capital of Germany?",
... model="luminous-base",
... few_shot_config=FewShotConfig(
... instruction="Answer each question.",
... examples=[
... FewShotExample(input="How high is Mount Everest?", response="8848 metres."),
... FewShotExample(input="When was Caesar killed?", response="44 AD."),
... ],
... input_prefix="Question",
... response_prefix="Answer",
... model="luminous-base",
... ),
... )
>>> output = task.run(input, InMemoryTracer())
"""

FEW_SHOT_PROMPT_TEMPLATE = """{% promptrange instruction %}{{instruction}}
{% for example in few_shot_examples %}###
{{input_prefix}}: {{ example.input }}
{{response_prefix}}: {{ example.response }}
{% endfor %}{% endpromptrange %}###
{{input_prefix}}: {% promptrange input %}{{input}}{% endpromptrange %}
{{response_prefix}}:"""

def __init__(self, client: AlephAlphaClientProtocol, model: str) -> None:
super().__init__()
self._client = client
self._completion = Complete(client)
self._model = model

def do_run(self, input: FewShotInput, task_span: TaskSpan) -> PromptOutput:
prompt = PromptTemplate(self.FEW_SHOT_PROMPT_TEMPLATE).to_rich_prompt(
instruction=input.few_shot_config.instruction,
input=input.input,
few_shot_examples=[
e.model_dump() for e in input.few_shot_config.examples
], # liquid can't handle classes, thus serializing
input_prefix=input.few_shot_config.input_prefix,
response_prefix=input.few_shot_config.response_prefix,
)
response = self._complete(
prompt,
input.maximum_response_tokens,
input.few_shot_config.additional_stop_sequences,
self._model,
task_span,
)
return PromptOutput(response=response, rich_prompt=prompt)

def _complete(
self,
prompt: Prompt,
maximum_tokens: int,
additional_stop_sequences: Sequence[str],
model: str,
task_span: TaskSpan,
) -> CompletionResponse:
request = CompletionRequest(
prompt,
maximum_tokens=maximum_tokens,
stop_sequences=["###"] + list(additional_stop_sequences),
)
return self._completion.run(
CompleteInput(request=request, model=model),
task_span,
).response
1 change: 0 additions & 1 deletion src/intelligence_layer/core/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from intelligence_layer.connectors.limited_concurrency_client import (
AlephAlphaClientProtocol,
)
from intelligence_layer.core.complete import Complete, CompleteInput
from intelligence_layer.core.prompt_template import PromptTemplate
from intelligence_layer.core.task import Task, Token
from intelligence_layer.core.tracer import TaskSpan
Expand Down
8 changes: 4 additions & 4 deletions src/intelligence_layer/core/explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from intelligence_layer.connectors.limited_concurrency_client import (
AlephAlphaClientProtocol,
)
from intelligence_layer.core.model import AlephAlphaModel
from intelligence_layer.core.task import Task
from intelligence_layer.core.tracer import TaskSpan

Expand All @@ -18,7 +19,6 @@ class ExplainInput(BaseModel):
"""

request: ExplanationRequest
model: str


class ExplainOutput(BaseModel):
Expand All @@ -42,10 +42,10 @@ class Explain(Task[ExplainInput, ExplainOutput]):
client: Aleph Alpha client instance for running model related API calls.
"""

def __init__(self, client: AlephAlphaClientProtocol) -> None:
def __init__(self, model: AlephAlphaModel) -> None:
super().__init__()
self._client = client
self._model = model

def do_run(self, input: ExplainInput, task_span: TaskSpan) -> ExplainOutput:
response = self._client.explain(input.request, input.model)
response = self._model._client.explain(input.request, self._model.name)
return ExplainOutput(response=response)
Loading

0 comments on commit fa10781

Please sign in to comment.