diff --git a/.github/workflows/build-docker-image.yaml b/.github/workflows/build-docker-image.yaml new file mode 100644 index 00000000..7f2bd6e3 --- /dev/null +++ b/.github/workflows/build-docker-image.yaml @@ -0,0 +1,120 @@ +name: Build Docker Image + +on: + workflow_call: + inputs: + build_args: + required: false + default: "" + type: string + cache_id: + required: true + type: string + extract_flavor: + required: false + default: "" + type: string + image_name: + required: true + type: string + image_tag: + required: false + default: "" + type: string + registry: + required: false + default: ghcr.io + type: string + +env: + FULL_IMAGE_NAME: ${{ inputs.registry }}/${{ inputs.image_name }} + +jobs: + build-image: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + strategy: + fail-fast: false + matrix: + platform: + - linux/amd64 + - linux/arm64 + + steps: + - name: Prepare + run: | + platform=${{ matrix.platform }} + echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV + + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ inputs.registry }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata for Docker images + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.FULL_IMAGE_NAME }} + tags: | + type=ref,event=branch + type=ref,event=tag + type=sha,prefix=git- + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + ${{ inputs.image_tag }} + flavor: | + latest=${{ github.ref == 'refs/heads/main' }} + ${{ inputs.extract_flavor }} + + - name: Extract metadata for Docker cache + id: cache-meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.FULL_IMAGE_NAME }} + tags: | + type=ref,event=branch + flavor: | + prefix=cache-${{ inputs.cache_id }}-${{ matrix.platform }}- + + - name: Build Docker image + uses: docker/build-push-action@v5 + id: build + with: + context: . + push: true + platforms: ${{ matrix.platform }} + labels: ${{ steps.meta.outputs.labels }} + outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true + cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }} + cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max + build-args: | + BUILD_HASH=${{ github.sha }} + ${{ inputs.build_args }} + + - name: Export digest + run: | + mkdir -p /tmp/digests + digest="${{ steps.build.outputs.digest }}" + touch "/tmp/digests/${digest#sha256:}" + + - name: Upload digest + uses: actions/upload-artifact@v4 + with: + name: digests-${{ inputs.cache_id }}-${{ env.PLATFORM_PAIR }} + path: /tmp/digests/* + if-no-files-found: error + retention-days: 1 diff --git a/.github/workflows/docker-build.yaml b/.github/workflows/docker-build.yaml index 5c6b4158..62398b43 100644 --- a/.github/workflows/docker-build.yaml +++ b/.github/workflows/docker-build.yaml @@ -1,409 +1,60 @@ name: Create and publish Docker images with specific build args on: - workflow_dispatch: - push: - branches: - - main - - dev - -env: - REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} - FULL_IMAGE_NAME: ghcr.io/${{ github.repository }} + workflow_dispatch: + push: + branches: + - main + - dev jobs: - build-main-image: - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - strategy: - fail-fast: false - matrix: - platform: - - linux/amd64 - - linux/arm64 - - steps: - - name: Prepare - run: | - platform=${{ matrix.platform }} - echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV - - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images (default latest tag) - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - type=semver,pattern={{version}} - type=semver,pattern={{major}}.{{minor}} - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - - - name: Extract metadata for Docker cache - id: cache-meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - flavor: | - prefix=cache-${{ matrix.platform }}- - - - name: Build Docker image (latest) - uses: docker/build-push-action@v5 - id: build - with: - context: . - push: true - platforms: ${{ matrix.platform }} - labels: ${{ steps.meta.outputs.labels }} - outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true - cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }} - cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max - build-args: | - BUILD_HASH=${{ github.sha }} - - - name: Export digest - run: | - mkdir -p /tmp/digests - digest="${{ steps.build.outputs.digest }}" - touch "/tmp/digests/${digest#sha256:}" - - - name: Upload digest - uses: actions/upload-artifact@v4 - with: - name: digests-main-${{ env.PLATFORM_PAIR }} - path: /tmp/digests/* - if-no-files-found: error - retention-days: 1 - - build-cuda-image: - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - strategy: - fail-fast: false - matrix: - platform: - - linux/amd64 - - linux/arm64 - - steps: - - name: Prepare - run: | - platform=${{ matrix.platform }} - echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV - - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images (cuda tag) - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - type=semver,pattern={{version}} - type=semver,pattern={{major}}.{{minor}} - type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - suffix=-cuda,onlatest=true - - - name: Extract metadata for Docker cache - id: cache-meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - flavor: | - prefix=cache-cuda-${{ matrix.platform }}- - - - name: Build Docker image (cuda) - uses: docker/build-push-action@v5 - id: build - with: - context: . - push: true - platforms: ${{ matrix.platform }} - labels: ${{ steps.meta.outputs.labels }} - outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true - cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }} - cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max - build-args: | - BUILD_HASH=${{ github.sha }} - USE_CUDA=true - - - name: Export digest - run: | - mkdir -p /tmp/digests - digest="${{ steps.build.outputs.digest }}" - touch "/tmp/digests/${digest#sha256:}" - - - name: Upload digest - uses: actions/upload-artifact@v4 - with: - name: digests-cuda-${{ env.PLATFORM_PAIR }} - path: /tmp/digests/* - if-no-files-found: error - retention-days: 1 - - build-minimum-image: - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - strategy: - fail-fast: false - matrix: - platform: - - linux/amd64 - - linux/arm64 - - steps: - - name: Prepare - run: | - platform=${{ matrix.platform }} - echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV - - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images (default latest tag) - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - type=semver,pattern={{version}} - type=semver,pattern={{major}}.{{minor}} - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - - - name: Extract metadata for Docker cache - id: cache-meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - flavor: | - prefix=cache-${{ matrix.platform }}- - - - name: Build Docker image (latest) - uses: docker/build-push-action@v5 - id: build - with: - context: . - push: true - platforms: ${{ matrix.platform }} - labels: ${{ steps.meta.outputs.labels }} - outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true - cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }} - cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max - build-args: | - BUILD_HASH=${{ github.sha }} - MINIMUM_BUILD=true - - - name: Export digest - run: | - mkdir -p /tmp/digests - digest="${{ steps.build.outputs.digest }}" - touch "/tmp/digests/${digest#sha256:}" - - - name: Upload digest - uses: actions/upload-artifact@v4 - with: - name: digests-minimum-${{ env.PLATFORM_PAIR }} - path: /tmp/digests/* - if-no-files-found: error - retention-days: 1 - - merge-main-images: - runs-on: ubuntu-latest - needs: [build-main-image] - steps: - - name: Download digests - uses: actions/download-artifact@v4 - with: - pattern: digests-main-* - path: /tmp/digests - merge-multiple: true - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images (default latest tag) - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - - - name: Create manifest list and push - working-directory: /tmp/digests - run: | - docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ - $(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *) - - - name: Inspect image - run: | - docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }} - - merge-cuda-images: - runs-on: ubuntu-latest - needs: [build-cuda-image] - steps: - - name: Download digests - uses: actions/download-artifact@v4 - with: - pattern: digests-cuda-* - path: /tmp/digests - merge-multiple: true - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images (default latest tag) - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - suffix=-cuda,onlatest=true - - - name: Create manifest list and push - working-directory: /tmp/digests - run: | - docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ - $(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *) - - - name: Inspect image - run: | - docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }} - - merge-minimum-images: - runs-on: ubuntu-latest - needs: [build-minimum-image] - steps: - - name: Download digests - uses: actions/download-artifact@v4 - with: - pattern: digests-minimum-* - path: /tmp/digests - merge-multiple: true - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images (default latest tag) - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=minimum - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - suffix=-minimum,onlatest=true - - - name: Create manifest list and push - working-directory: /tmp/digests - run: | - docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ - $(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *) - - - name: Inspect image - run: | - docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }} + build-main-image: + uses: ./.github/workflows/build-docker-image.yaml + with: + image_name: ${{ github.repository }} + cache_id: main + + build-cuda-image: + uses: ./.github/workflows/build-docker-image.yaml + with: + image_name: ${{ github.repository }} + cache_id: cuda + image_tag: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda + extract_flavor: suffix=-cuda,onlatest=true + build_args: | + USE_CUDA=true + + build-minimum-image: + uses: ./.github/workflows/build-docker-image.yaml + with: + image_name: ${{ github.repository }} + cache_id: minimum + image_tag: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=minimum + extract_flavor: suffix=-minimum,onlatest=true + build_args: | + MINIMUM_BUILD=true + + merge-main-images: + uses: ./.github/workflows/merge-docker-images.yaml + needs: [build-main-image] + with: + image_name: ${{ github.repository }} + cache_id: main + + merge-cuda-images: + uses: ./.github/workflows/merge-docker-images.yaml + needs: [build-cuda-image] + with: + image_name: ${{ github.repository }} + cache_id: cuda + extract_flavor: suffix=-cuda,onlatest=true + extract_tags: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=cuda + + merge-minimum-images: + uses: ./.github/workflows/merge-docker-images.yaml + needs: [build-minimum-image] + with: + image_name: ${{ github.repository }} + cache_id: minimum + extract_flavor: suffix=-minimum,onlatest=true + extract_tags: type=raw,enable=${{ github.ref == 'refs/heads/main' }},prefix=,suffix=,value=minimum diff --git a/.github/workflows/merge-docker-images.yaml b/.github/workflows/merge-docker-images.yaml new file mode 100644 index 00000000..512f42cf --- /dev/null +++ b/.github/workflows/merge-docker-images.yaml @@ -0,0 +1,71 @@ +name: Merge Docker Images + +on: + workflow_call: + inputs: + cache_id: + required: true + type: string + extract_flavor: + required: false + default: "" + type: string + extract_tags: + required: false + default: "" + type: string + image_name: + required: true + type: string + registry: + required: false + default: ghcr.io + type: string + +env: + FULL_IMAGE_NAME: ${{ inputs.registry }}/${{ inputs.image_name }} + +jobs: + merge-images: + runs-on: ubuntu-latest + steps: + - name: Download digests + uses: actions/download-artifact@v4 + with: + pattern: digests-${{ inputs.cache_id }}-* + path: /tmp/digests + merge-multiple: true + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ inputs.registry }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata for Docker images + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.FULL_IMAGE_NAME }} + tags: | + type=ref,event=branch + type=ref,event=tag + type=sha,prefix=git- + ${{ inputs.extract_tags }} + flavor: | + latest=${{ github.ref == 'refs/heads/main' }} + ${{ inputs.extract_flavor }} + + - name: Create manifest list and push + working-directory: /tmp/digests + run: | + docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ + $(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *) + + - name: Inspect image + run: | + docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ steps.meta.outputs.version }} diff --git a/README.md b/README.md index 89e0a7a2..5e78ab42 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,10 @@ # Pipelines: UI-Agnostic OpenAI API Plugin Framework +> [!TIP] +> If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. + + Welcome to **Pipelines**, an [Open WebUI](https://github.com/open-webui) initiative. Pipelines bring modular, customizable workflows to any UI client supporting OpenAI API specs – and much more! Easily extend functionalities, integrate unique logic, and create dynamic workflows with just a few lines of code. ## 🚀 Why Choose Pipelines? diff --git a/blueprints/function_calling_blueprint.py b/blueprints/function_calling_blueprint.py index 4e0f496b..f4739b06 100644 --- a/blueprints/function_calling_blueprint.py +++ b/blueprints/function_calling_blueprint.py @@ -11,6 +11,16 @@ get_tools_specs, ) +# System prompt for function calling +DEFAULT_SYSTEM_PROMPT = ( + """Tools: {} + +If a function tool doesn't match the query, return an empty string. Else, pick a +function tool, fill in the parameters from the function tool's schema, and +return it in the format {{ "name": \"functionName\", "parameters": {{ "key": +"value" }} }}. Only pick a function if the user asks. Only return the object. Do not return any other text." +""" + ) class Pipeline: class Valves(BaseModel): @@ -29,7 +39,7 @@ class Valves(BaseModel): TASK_MODEL: str TEMPLATE: str - def __init__(self): + def __init__(self, prompt: str | None = None) -> None: # Pipeline filters are only compatible with Open WebUI # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API. self.type = "filter" @@ -40,6 +50,8 @@ def __init__(self): # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. # self.id = "function_calling_blueprint" self.name = "Function Calling Blueprint" + self.prompt = prompt or DEFAULT_SYSTEM_PROMPT + self.tools: object = None # Initialize valves self.valves = self.Valves( @@ -87,14 +99,45 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: # Get the tools specs tools_specs = get_tools_specs(self.tools) - # System prompt for function calling - fc_system_prompt = ( - f"Tools: {json.dumps(tools_specs, indent=2)}" - + """ -If a function tool doesn't match the query, return an empty string. Else, pick a function tool, fill in the parameters from the function tool's schema, and return it in the format { "name": \"functionName\", "parameters": { "key": "value" } }. Only pick a function if the user asks. Only return the object. Do not return any other text." -""" - ) + prompt = self.prompt.format(json.dumps(tools_specs, indent=2)) + content = "History:\n" + "\n".join( + [ + f"{message['role']}: {message['content']}" + for message in body["messages"][::-1][:4] + ] + ) + f"Query: {user_message}" + + result = self.run_completion(prompt, content) + messages = self.call_function(result, body["messages"]) + + return {**body, "messages": messages} + + # Call the function + def call_function(self, result, messages: list[dict]) -> list[dict]: + if "name" not in result: + return messages + + function = getattr(self.tools, result["name"]) + function_result = None + try: + function_result = function(**result["parameters"]) + except Exception as e: + print(e) + + # Add the function result to the system prompt + if function_result: + system_prompt = self.valves.TEMPLATE.replace( + "{{CONTEXT}}", function_result + ) + + messages = add_or_update_system_message( + system_prompt, messages + ) + # Return the updated messages + return messages + + def run_completion(self, system_prompt: str, content: str) -> dict: r = None try: # Call the OpenAI API to get the function response @@ -105,18 +148,11 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: "messages": [ { "role": "system", - "content": fc_system_prompt, + "content": system_prompt, }, { "role": "user", - "content": "History:\n" - + "\n".join( - [ - f"{message['role']}: {message['content']}" - for message in body["messages"][::-1][:4] - ] - ) - + f"Query: {user_message}", + "content": content, }, ], # TODO: dynamically add response_format? @@ -137,29 +173,7 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: if content != "": result = json.loads(content) print(result) - - # Call the function - if "name" in result: - function = getattr(self.tools, result["name"]) - function_result = None - try: - function_result = function(**result["parameters"]) - except Exception as e: - print(e) - - # Add the function result to the system prompt - if function_result: - system_prompt = self.valves.TEMPLATE.replace( - "{{CONTEXT}}", function_result - ) - - print(system_prompt) - messages = add_or_update_system_message( - system_prompt, body["messages"] - ) - - # Return the updated messages - return {**body, "messages": messages} + return result except Exception as e: print(f"Error: {e}") @@ -170,4 +184,4 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: except: pass - return body + return {} diff --git a/examples/pipelines/integrations/dify_pipeline.py b/examples/pipelines/integrations/dify_pipeline.py new file mode 100644 index 00000000..86e412ac --- /dev/null +++ b/examples/pipelines/integrations/dify_pipeline.py @@ -0,0 +1,84 @@ +from typing import List, Union, Generator, Iterator, Optional +from pprint import pprint +import requests, json, warnings + +# Uncomment to disable SSL verification warnings if needed. +# warnings.filterwarnings('ignore', message='Unverified HTTPS request') + +class Pipeline: + def __init__(self): + self.name = "Dify Agent Pipeline" + self.api_url = "http://dify.hostname/v1/workflows/run" # Set correct hostname + self.api_key = "app-dify-key" # Insert your actual API key here.v + self.api_request_stream = True # Dify support stream + self.verify_ssl = True + self.debug = False + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. + print(f"inlet: {__name__}") + if self.debug: + print(f"inlet: {__name__} - body:") + pprint(body) + print(f"inlet: {__name__} - user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. + print(f"outlet: {__name__}") + if self.debug: + print(f"outlet: {__name__} - body:") + pprint(body) + print(f"outlet: {__name__} - user:") + pprint(user) + return body + + def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]: + print(f"pipe: {__name__}") + + if self.debug: + print(f"pipe: {__name__} - received message from user: {user_message}") + + # Set reponse mode Dify API parameter + if self.api_request_stream is True: + response_mode = "streaming" + else: + response_mode = "blocking" + + # This function triggers the workflow using the specified API. + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json' + } + data = { + "inputs": {"prompt": user_message}, + "response_mode": response_mode, + "user": body["user"]["email"] + } + + response = requests.post(self.api_url, headers=headers, json=data, stream=self.api_request_stream, verify=self.verify_ssl) + if response.status_code == 200: + # Process and yield each chunk from the response + for line in response.iter_lines(): + if line: + try: + # Remove 'data: ' prefix and parse JSON + json_data = json.loads(line.decode('utf-8').replace('data: ', '')) + # Extract and yield only the 'text' field from the nested 'data' object + if 'data' in json_data and 'text' in json_data['data']: + yield json_data['data']['text'] + except json.JSONDecodeError: + print(f"Failed to parse JSON: {line}") + else: + yield f"Workflow request failed with status code: {response.status_code}" diff --git a/examples/pipelines/integrations/n8n_pipeline.py b/examples/pipelines/integrations/n8n_pipeline.py new file mode 100644 index 00000000..51e0e4d2 --- /dev/null +++ b/examples/pipelines/integrations/n8n_pipeline.py @@ -0,0 +1,79 @@ +from typing import List, Union, Generator, Iterator, Optional +from pprint import pprint +import requests, json, warnings + +# Uncomment to disable SSL verification warnings if needed. +# warnings.filterwarnings('ignore', message='Unverified HTTPS request') + +class Pipeline: + def __init__(self): + self.name = "N8N Agent Pipeline" + self.api_url = "https://n8n.host/webhook/myflow" # Set correct hostname + self.api_key = "" # Insert your actual API key here + self.verify_ssl = True + self.debug = False + # Please note that N8N do not support stream reponses + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. + print(f"inlet: {__name__}") + if self.debug: + print(f"inlet: {__name__} - body:") + pprint(body) + print(f"inlet: {__name__} - user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. + print(f"outlet: {__name__}") + if self.debug: + print(f"outlet: {__name__} - body:") + pprint(body) + print(f"outlet: {__name__} - user:") + pprint(user) + return body + + def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG. + print(f"pipe: {__name__}") + + if self.debug: + print(f"pipe: {__name__} - received message from user: {user_message}") + + # This function triggers the workflow using the specified API. + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json' + } + data = { + "inputs": {"prompt": user_message}, + "user": body["user"]["email"] + } + + response = requests.post(self.api_url, headers=headers, json=data, verify=self.verify_ssl) + if response.status_code == 200: + # Process and yield each chunk from the response + try: + for line in response.iter_lines(): + if line: + # Decode each line assuming UTF-8 encoding and directly parse it as JSON + json_data = json.loads(line.decode('utf-8')) + # Check if 'output' exists in json_data and yield it + if 'output' in json_data: + yield json_data['output'] + except json.JSONDecodeError as e: + print(f"Failed to parse JSON from line. Error: {str(e)}") + yield "Error in JSON parsing." + else: + yield f"Workflow request failed with status code: {response.status_code}" diff --git a/examples/pipelines/providers/anthropic_manifold_pipeline.py b/examples/pipelines/providers/anthropic_manifold_pipeline.py index 81fa910f..0e06f147 100644 --- a/examples/pipelines/providers/anthropic_manifold_pipeline.py +++ b/examples/pipelines/providers/anthropic_manifold_pipeline.py @@ -46,7 +46,7 @@ def get_anthropic_models(self): {"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, {"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, - {"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"}, + {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, ] async def on_startup(self): diff --git a/examples/pipelines/providers/azure_jais_core42_pipeline.py b/examples/pipelines/providers/azure_jais_core42_pipeline.py new file mode 100644 index 00000000..2b8e8a79 --- /dev/null +++ b/examples/pipelines/providers/azure_jais_core42_pipeline.py @@ -0,0 +1,215 @@ +""" +title: Jais Azure Pipeline with Stream Handling Fix +author: Abdessalaam Al-Alestini +date: 2024-06-20 +version: 1.3 +license: MIT +description: A pipeline for generating text using the Jais model via Azure AI Inference API, with fixed stream handling. +About Jais: https://inceptionai.ai/jais/ +requirements: azure-ai-inference +environment_variables: AZURE_INFERENCE_CREDENTIAL, AZURE_INFERENCE_ENDPOINT, MODEL_ID +""" + +import os +import json +import logging +from typing import List, Union, Generator, Iterator, Tuple +from pydantic import BaseModel +from azure.ai.inference import ChatCompletionsClient +from azure.core.credentials import AzureKeyCredential +from azure.ai.inference.models import SystemMessage, UserMessage, AssistantMessage + +# Set up logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +def pop_system_message(messages: List[dict]) -> Tuple[str, List[dict]]: + """ + Extract the system message from the list of messages. + + Args: + messages (List[dict]): List of message dictionaries. + + Returns: + Tuple[str, List[dict]]: A tuple containing the system message (or empty string) and the updated list of messages. + """ + system_message = "" + updated_messages = [] + + for message in messages: + if message['role'] == 'system': + system_message = message['content'] + else: + updated_messages.append(message) + + return system_message, updated_messages + + +class Pipeline: + + class Valves(BaseModel): + AZURE_INFERENCE_CREDENTIAL: str = "" + AZURE_INFERENCE_ENDPOINT: str = "" + MODEL_ID: str = "jais-30b-chat" + + def __init__(self): + self.type = "manifold" + self.id = "jais-azure" + self.name = "jais-azure/" + + self.valves = self.Valves( + **{ + "AZURE_INFERENCE_CREDENTIAL": + os.getenv("AZURE_INFERENCE_CREDENTIAL", + "your-azure-inference-key-here"), + "AZURE_INFERENCE_ENDPOINT": + os.getenv("AZURE_INFERENCE_ENDPOINT", + "your-azure-inference-endpoint-here"), + "MODEL_ID": + os.getenv("MODEL_ID", "jais-30b-chat"), + }) + self.update_client() + + def update_client(self): + self.client = ChatCompletionsClient( + endpoint=self.valves.AZURE_INFERENCE_ENDPOINT, + credential=AzureKeyCredential( + self.valves.AZURE_INFERENCE_CREDENTIAL)) + + def get_jais_models(self): + return [ + { + "id": "jais-30b-chat", + "name": "Jais 30B Chat" + }, + ] + + async def on_startup(self): + logger.info(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + logger.info(f"on_shutdown:{__name__}") + pass + + async def on_valves_updated(self): + self.update_client() + + def pipelines(self) -> List[dict]: + return self.get_jais_models() + + def pipe(self, user_message: str, model_id: str, messages: List[dict], + body: dict) -> Union[str, Generator, Iterator]: + try: + logger.debug( + f"Received request - user_message: {user_message}, model_id: {model_id}" + ) + logger.debug(f"Messages: {json.dumps(messages, indent=2)}") + logger.debug(f"Body: {json.dumps(body, indent=2)}") + + # Remove unnecessary keys + for key in ['user', 'chat_id', 'title']: + body.pop(key, None) + + system_message, messages = pop_system_message(messages) + + # Prepare messages for Jais + jais_messages = [SystemMessage( + content=system_message)] if system_message else [] + jais_messages += [ + UserMessage(content=msg['content']) if msg['role'] == 'user' + else SystemMessage(content=msg['content']) if msg['role'] + == 'system' else AssistantMessage(content=msg['content']) + for msg in messages + ] + + # Prepare the payload + allowed_params = { + 'temperature', 'max_tokens', 'presence_penalty', + 'frequency_penalty', 'top_p' + } + filtered_body = { + k: v + for k, v in body.items() if k in allowed_params + } + + logger.debug(f"Prepared Jais messages: {jais_messages}") + logger.debug(f"Filtered body: {filtered_body}") + + is_stream = body.get("stream", False) + if is_stream: + return self.stream_response(jais_messages, filtered_body) + else: + return self.get_completion(jais_messages, filtered_body) + except Exception as e: + logger.error(f"Error in pipe: {str(e)}", exc_info=True) + return json.dumps({"error": str(e)}) + + def stream_response(self, jais_messages: List[Union[SystemMessage, UserMessage, AssistantMessage]], params: dict) -> str: + try: + complete_response = "" + response = self.client.complete(messages=jais_messages, + model=self.valves.MODEL_ID, + stream=True, + **params) + for update in response: + if update.choices: + delta_content = update.choices[0].delta.content + if delta_content: + complete_response += delta_content + return complete_response + except Exception as e: + logger.error(f"Error in stream_response: {str(e)}", exc_info=True) + return json.dumps({"error": str(e)}) + + def get_completion(self, jais_messages: List[Union[SystemMessage, UserMessage, AssistantMessage]], params: dict) -> str: + try: + response = self.client.complete(messages=jais_messages, + model=self.valves.MODEL_ID, + **params) + if response.choices: + result = response.choices[0].message.content + logger.debug(f"Completion result: {result}") + return result + else: + logger.warning("No choices in completion response") + return "" + except Exception as e: + logger.error(f"Error in get_completion: {str(e)}", exc_info=True) + return json.dumps({"error": str(e)}) + + +# TEST CASE TO RUN THE PIPELINE +if __name__ == "__main__": + pipeline = Pipeline() + + messages = [{ + "role": "user", + "content": "How many languages are in the world?" + }] + body = { + "temperature": 0.5, + "max_tokens": 150, + "presence_penalty": 0.1, + "frequency_penalty": 0.8, + "stream": True # Change to True to test streaming + } + + result = pipeline.pipe(user_message="How many languages are in the world?", + model_id="jais-30b-chat", + messages=messages, + body=body) + + # Handle streaming result + if isinstance(result, str): + content = json.dumps({"content": result}, ensure_ascii=False) + print(content) + else: + complete_response = "" + for part in result: + content_delta = json.loads(part).get("delta") + if content_delta: + complete_response += content_delta + + print(json.dumps({"content": complete_response}, ensure_ascii=False)) diff --git a/examples/pipelines/providers/azure_openai_manifold_pipeline.py b/examples/pipelines/providers/azure_openai_manifold_pipeline.py new file mode 100644 index 00000000..6f77a449 --- /dev/null +++ b/examples/pipelines/providers/azure_openai_manifold_pipeline.py @@ -0,0 +1,99 @@ +from typing import List, Union, Generator, Iterator +from pydantic import BaseModel +import requests +import os + + +class Pipeline: + class Valves(BaseModel): + # You can add your custom valves here. + AZURE_OPENAI_API_KEY: str + AZURE_OPENAI_ENDPOINT: str + AZURE_OPENAI_API_VERSION: str + AZURE_OPENAI_MODELS: str + AZURE_OPENAI_MODEL_NAMES: str + + def __init__(self): + self.type = "manifold" + self.name = "Azure OpenAI: " + self.valves = self.Valves( + **{ + "AZURE_OPENAI_API_KEY": os.getenv("AZURE_OPENAI_API_KEY", "your-azure-openai-api-key-here"), + "AZURE_OPENAI_ENDPOINT": os.getenv("AZURE_OPENAI_ENDPOINT", "your-azure-openai-endpoint-here"), + "AZURE_OPENAI_API_VERSION": os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01"), + "AZURE_OPENAI_MODELS": os.getenv("AZURE_OPENAI_MODELS", "gpt-35-turbo;gpt-4o"), + "AZURE_OPENAI_MODEL_NAMES": os.getenv("AZURE_OPENAI_MODEL_NAMES", "GPT-35 Turbo;GPT-4o"), + } + ) + self.set_pipelines() + pass + + def set_pipelines(self): + models = self.valves.AZURE_OPENAI_MODELS.split(";") + model_names = self.valves.AZURE_OPENAI_MODEL_NAMES.split(";") + self.pipelines = [ + {"id": model, "name": name} for model, name in zip(models, model_names) + ] + print(f"azure_openai_manifold_pipeline - models: {self.pipelines}") + pass + + async def on_valves_updated(self): + self.set_pipelines() + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + def pipe( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG. + print(f"pipe:{__name__}") + + print(messages) + print(user_message) + + headers = { + "api-key": self.valves.AZURE_OPENAI_API_KEY, + "Content-Type": "application/json", + } + + url = f"{self.valves.AZURE_OPENAI_ENDPOINT}/openai/deployments/{model_id}/chat/completions?api-version={self.valves.AZURE_OPENAI_API_VERSION}" + + allowed_params = {'messages', 'temperature', 'role', 'content', 'contentPart', 'contentPartImage', + 'enhancements', 'dataSources', 'n', 'stream', 'stop', 'max_tokens', 'presence_penalty', + 'frequency_penalty', 'logit_bias', 'user', 'function_call', 'funcions', 'tools', + 'tool_choice', 'top_p', 'log_probs', 'top_logprobs', 'response_format', 'seed'} + # remap user field + if "user" in body and not isinstance(body["user"], str): + body["user"] = body["user"]["id"] if "id" in body["user"] else str(body["user"]) + filtered_body = {k: v for k, v in body.items() if k in allowed_params} + # log fields that were filtered out as a single line + if len(body) != len(filtered_body): + print(f"Dropped params: {', '.join(set(body.keys()) - set(filtered_body.keys()))}") + + try: + r = requests.post( + url=url, + json=filtered_body, + headers=headers, + stream=True, + ) + + r.raise_for_status() + if body["stream"]: + return r.iter_lines() + else: + return r.json() + except Exception as e: + if r: + text = r.text + return f"Error: {e} ({text})" + else: + return f"Error: {e}" diff --git a/schemas.py b/schemas.py index 57517dfd..caa0342a 100644 --- a/schemas.py +++ b/schemas.py @@ -1,23 +1,13 @@ -from typing import List, Union, Optional -from pydantic import BaseModel, RootModel, ConfigDict - -class ImageContent(BaseModel): - type: str - image_url: dict - -class TextContent(BaseModel): - type: str - text: str - -class MessageContent(RootModel): - root: Union[TextContent, ImageContent] +from typing import List, Optional +from pydantic import BaseModel, ConfigDict class OpenAIChatMessage(BaseModel): role: str - content: Union[str, List[MessageContent]] + content: str | List model_config = ConfigDict(extra="allow") + class OpenAIChatCompletionForm(BaseModel): stream: bool = True model: str @@ -25,6 +15,7 @@ class OpenAIChatCompletionForm(BaseModel): model_config = ConfigDict(extra="allow") + class FilterForm(BaseModel): body: dict user: Optional[dict] = None diff --git a/start.sh b/start.sh index a6fd1f63..e2ddd8b4 100755 --- a/start.sh +++ b/start.sh @@ -8,13 +8,13 @@ PIPELINES_DIR=${PIPELINES_DIR:-./pipelines} reset_pipelines_dir() { if [ "$RESET_PIPELINES_DIR" = true ]; then echo "Resetting pipelines directory: $PIPELINES_DIR" - + # Check if the directory exists if [ -d "$PIPELINES_DIR" ]; then # Remove all contents of the directory rm -rf "${PIPELINES_DIR:?}"/* echo "All contents in $PIPELINES_DIR have been removed." - + # Optionally recreate the directory if needed mkdir -p "$PIPELINES_DIR" echo "$PIPELINES_DIR has been recreated." @@ -87,14 +87,14 @@ install_frontmatter_requirements() { local file_content=$(cat "$1") # Extract the first triple-quoted block local first_block=$(echo "$file_content" | awk '/"""/{flag=!flag; if(flag) count++; if(count == 2) {exit}} flag' ) - + # Check if the block contains requirements local requirements=$(echo "$first_block" | grep -i 'requirements:') - + if [ -n "$requirements" ]; then # Extract the requirements list requirements=$(echo "$requirements" | awk -F': ' '{print $2}' | tr ',' ' ' | tr -d '\r') - + # Construct and echo the pip install command local pip_command="pip install $requirements" echo "$pip_command" @@ -108,13 +108,14 @@ install_frontmatter_requirements() { # Check if PIPELINES_URLS environment variable is set and non-empty if [[ -n "$PIPELINES_URLS" ]]; then - pipelines_dir="./pipelines" - mkdir -p "$pipelines_dir" + if [ ! -d "$PIPELINES_DIR" ]; then + mkdir -p "$PIPELINES_DIR" + fi # Split PIPELINES_URLS by ';' and iterate over each path IFS=';' read -ra ADDR <<< "$PIPELINES_URLS" for path in "${ADDR[@]}"; do - download_pipelines "$path" "$pipelines_dir" + download_pipelines "$path" "$PIPELINES_DIR" done for file in "$pipelines_dir"/*; do diff --git a/utils/pipelines/main.py b/utils/pipelines/main.py index 2d064d90..5d335225 100644 --- a/utils/pipelines/main.py +++ b/utils/pipelines/main.py @@ -62,7 +62,7 @@ def pop_system_message(messages: List[dict]) -> Tuple[dict, List[dict]]: return get_system_message(messages), remove_system_message(messages) -def add_or_update_system_message(content: str, messages: List[dict]): +def add_or_update_system_message(content: str, messages: List[dict]) -> List[dict]: """ Adds a new system message at the beginning of the messages list or updates the existing system message at the beginning.