Building an AI-Powered Workflow with AWS Bedrock, Elasticsearch, LangChain, and Pydantic
Introduction
In this blog post, we’ll explore how to integrate AWS Bedrock for generating AI-based content, store and manage the outputs with Elasticsearch, handle queries using LangChain, and ensure data integrity with Pydantic. This powerful combination offers a scalable solution for working with AI models, validating output, and making results queryable.
Overview
We’ll cover:
Using AWS Bedrock for AI Model Generation: Leveraging Bedrock's robust capabilities to generate AI responses.
Indexing AI Responses in Elasticsearch: Storing and managing AI-generated data.
Parsing and Handling Prompts with LangChain: Crafting and sending AI queries.
Validating AI Responses with Pydantic: Ensuring data structure and correctness.
Prerequisites
AWS Bedrock Access:
- An AWS account with Bedrock enabled for using Claude-3.
Elasticsearch Setup:
- Elastic Cloud or self-hosted Elasticsearch instance with Cloud ID and API Key.
Python 3.8+ Environment:
Install required libraries:
pip install langchain-core langchain-aws pydantic elasticsearch
Familiarity:
- Basic knowledge of LangChain, Pydantic, and Elasticsearch.
Ensure all components are set up for smooth integration and execution!
1. AWS Bedrock for AI Model Generation
AWS Bedrock provides a powerful platform for deploying and interacting with pre-trained language models. In this example, we’ll use the Claude model via AWS Bedrock to generate a summary of advancements in Agentic AI.
Here’s how you can configure and interact with the model:
from langchain_aws import ChatBedrock
# Define the Bedrock model with specific parameters (temperature, max tokens)
llm = ChatBedrock(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
model_kwargs=dict(
temperature=0.1, # Controls the randomness of the output
max_tokens=128 # Limits the maximum number of tokens in the output
)
)
We’re using the Claude model here with parameters for temperature (controls randomness) and token limits (max response length). This model will generate summaries or respond to queries based on the prompt we pass.
2. Storing Responses in Elasticsearch
Once we have AI-generated data, we need a reliable way to store it. Elasticsearch is a powerful tool for indexing and querying large amounts of structured or unstructured data.
Here’s how we can configure and store the model’s output in Elasticsearch:
from elasticsearch import Elasticsearch
from datetime import datetime
# Set up Elasticsearch with cloud configuration and API key
es = Elasticsearch(
cloud_id=elastic_cloud_id,
api_key=elastic_api_key
)
# Prepare the AI response data for storage in Elasticsearch
doc = {
"timestamp": datetime.now().isoformat(),
"prompt_template": {
"input_variables": prompt.input_variables,
"partial_variables": prompt.partial_variables,
"template": prompt.template
},
"output": response # Store model output
}
# Index the document into the Elasticsearch index 'ai-responses'
es.index(index="ai-responses", document=doc)
# Print a confirmation message
print("Response indexed in Elasticsearch successfully!")
In this example, elastic_cloud_id
and elastic_api_key
represent credentials for connecting to your Elasticsearch instance. The AI-generated response is indexed with a timestamp and stored in the ai-responses
index.
3. Crafting and Sending Prompts with LangChain
LangChain enables easy integration of natural language prompts with large language models (LLMs). We can structure queries in a meaningful way using prompt templates, which ensures consistency and helps in customizing the AI’s responses.
Here’s how we can set up a prompt template using LangChain:
from langchain_core.prompts import PromptTemplate
# Define a query asking for a summary of advancements in a specific topic
tech_query = "Summarize the advancements in Agentic AI."
# Create a prompt template for querying the AI model
prompt = PromptTemplate(
template="Provide a summary for the following topic.\n{format_instructions}\n{query}\n",
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
This template allows us to structure the query in a consistent format, helping the model generate responses that align with the requested structure.
4. Validating and Parsing AI Responses with Pydantic
Ensuring that the responses from the model are well-structured and validated is crucial for maintaining data quality. Pydantic is a powerful tool for validating and parsing the AI’s output into a structured format.
Let’s define a Pydantic model to structure the AI-generated response:
from langchain_core.pydantic_v1 import BaseModel, Field
# Create a Pydantic model for the expected AI response structure
class TechAdvancementSummary(BaseModel):
topic: str = Field(description="The topic under discussion, e.g., 'Agentic AI'")
summary: str = Field(description="A summarized description of advancements for the topic")
This model ensures that every response from the AI follows a consistent structure, containing a topic
and a summary
.
Parsing the Response with Pydantic
Now that we have our Pydantic model, we can set up a JSON output parser to validate and parse the AI response into the correct format:
from langchain_core.output_parsers import JsonOutputParser
# Set up a JSON output parser to validate and parse the model response into the Pydantic structure
parser = JsonOutputParser(pydantic_object=TechAdvancementSummary)
# Create a chain that ties together the prompt, model (LLM), and parser
chain = prompt | llm | parser
# Invoke the chain to get the structured response as a dictionary
response = chain.invoke({"query": tech_query})
# Format the response as a JSON string for readability
print(json.dumps(response, indent=2))
This ensures that the model’s response is correctly parsed into a dictionary, following the structure defined by Pydantic.
Full Code to Run and Store results in Elasticsearch
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_aws import ChatBedrock
import json # For formatting the output as JSON
from datetime import datetime
from elasticsearch import Elasticsearch
# Define the Bedrock model with specific parameters (temperature, max tokens)
llm = ChatBedrock(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
model_kwargs=dict(
temperature=0.1, # Controls the randomness of the output
max_tokens=128 # Limits the maximum number of tokens in the output
)
)
# Create a Pydantic data model for the expected response structure
class TechAdvancementSummary(BaseModel):
topic: str = Field(description="The topic under discussion, e.g., 'Agentic AI'")
summary: str = Field(description="A summarized description of advancements for the topic")
# Define a query asking for a summary of advancements in a specific topic
tech_query = "Summarize the advancements in Agentic AI."
# Set up a JSON output parser to validate and parse the model response into the Pydantic structure
parser = JsonOutputParser(pydantic_object=TechAdvancementSummary)
# Create a prompt template to structure the query and ensure correct formatting of the output
prompt = PromptTemplate(
template="Provide a summary for the following topic.\n{format_instructions}\n{query}\n",
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()}, # Format instructions for JSON
)
# Create a chain that ties together the prompt, model (LLM), and parser
chain = prompt | llm | parser
# Invoke the chain to get the structured response as a dictionary
response = chain.invoke({"query": tech_query})
# Format the response as a JSON string and print it in a readable format
print(json.dumps(response, indent=2))
# Elasticsearch setup
es = Elasticsearch(
cloud_id=elastic_cloud_id,
api_key=elastic_api_key
)
# Prepare the data for Elasticsearch
doc = {
"timestamp": datetime.now().isoformat(),
"prompt_template": {
"input_variables": prompt.input_variables,
"partial_variables": prompt.partial_variables,
"template": prompt.template
},
"output": response # Store model output
}
# Index the document into Elasticsearch
es.index(index="ai-responses", document=doc)
# Display the result
print("Response indexed in Elasticsearch successfully!")
Elasticsearch Output
Login to Elastic.co with your credentials.
Conclusion
By combining AWS Bedrock for powerful AI model generation, Elasticsearch for indexing and querying responses, LangChain for structured prompts, and Pydantic for data validation, we can build a robust, scalable, and structured AI-powered workflow. This setup allows us to generate, store, and validate AI responses efficiently, ensuring that the data we handle is reliable and usable.
Whether you're working with AI models in production or experimenting with different architectures, this integration of AWS Bedrock, Elasticsearch, LangChain, and Pydantic provides a seamless and reliable solution for handling AI-powered workflows.