Learn to build powerful AI agents for specific tasks
Learn how to deploy your agent as a service, API, or integrated application
Once you've developed your AI agent, the next critical step is deploying it for real-world use. The deployment approach you choose depends on your specific requirements, target users, and technical constraints. This guide covers the most common deployment options and best practices for each.
Turning your AI agent into a web service allows users to interact with it through HTTP requests. This is one of the most versatile deployment options.
For Python-based agents, Flask and FastAPI are popular frameworks for creating web services.
# Basic Flask implementation for an AI agent API
from flask import Flask, request, jsonify
import os
from langchain.agents import initialize_agent, Tool
from langchain.llms import OpenAI
app = Flask(__name__)
# Initialize your agent (assuming you've built it with LangChain)
llm = OpenAI(temperature=0)
tools = [
# Your agent's tools here
]
agent = initialize_agent(tools, llm, agent="zero-shot-react-description", verbose=True)
@app.route('/api/agent', methods=['POST'])
def query_agent():
data = request.json
if not data or 'query' not in data:
return jsonify({'error': 'Query parameter is required'}), 400
try:
response = agent.run(data['query'])
return jsonify({'response': response})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
Several cloud platforms provide easy deployment options for web services:
1. Create a Dockerfile in your project directory:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV PORT=8080
CMD exec gunicorn --bind :$PORT app:app
2. Build and deploy with Google Cloud CLI:
builds submit --tag gcr.io/PROJECT_ID/ai-agent
gcloud run deploy ai-agent --image gcr.io/PROJECT_ID/ai-agent --platform managed
A RESTful API provides a standardized interface for interacting with your agent, making it easily accessible from various clients and platforms.
Consider using API gateway services to handle authentication, rate limiting, and monitoring:
Documenting your API with OpenAPI (formerly Swagger) helps users understand how to interact with your agent:
{
"openapi": "3.0.0",
"info": {
"title": "AI Agent API",
"description": "API for interacting with an AI agent",
"version": "1.0.0"
},
"paths": {
"/api/agent": {
"post": {
"summary": "Query the AI agent",
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query to process"
}
},
"required": ["query"]
}
}
}
},
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"response": {
"type": "string",
"description": "The agent's response"
}
}
}
}
}
},
"400": {
"description": "Bad request"
},
"500": {
"description": "Server error"
}
}
}
}
}
}
Serverless functions provide a cost-effective, scalable solution for deploying AI agents that don't require continuous availability.
# AWS Lambda function for an AI agent (Python)
import json
import os
from langchain.agents import initialize_agent, Tool
from langchain.llms import OpenAI
# Initialize your agent
llm = OpenAI(temperature=0)
tools = [
# Your agent's tools here
]
agent = initialize_agent(tools, llm, agent="zero-shot-react-description", verbose=True)
def lambda_handler(event, context):
# Extract query from the event
body = json.loads(event.get('body', '{}'))
query = body.get('query')
if not query:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Query parameter is required'})
}
try:
# Run the agent
response = agent.run(query)
return {
'statusCode': 200,
'body': json.dumps({'response': response})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
Integrating your AI agent into desktop or mobile applications provides a native user experience with better performance and offline capabilities.
// Example: Calling an AI agent API from a React Native app
import React, { useState } from 'react';
import { View, TextInput, Button, Text, StyleSheet } from 'react-native';
export default function AgentScreen() {
const [query, setQuery] = useState('');
const [response, setResponse] = useState('');
const [loading, setLoading] = useState(false);
const queryAgent = async () => {
if (!query.trim()) return;
setLoading(true);
try {
const result = await fetch('https://your-agent-api.com/api/agent', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_API_KEY'
},
body: JSON.stringify({ query: query })
});
const data = await result.json();
setResponse(data.response);
} catch (error) {
setResponse(`Error: ${error.message}`);
} finally {
setLoading(false);
}
};
return (
<View style={styles.container}>
<TextInput
style={styles.input}
value={query}
onChangeText={setQuery}
placeholder="Ask the AI agent..."
/>
<Button title={loading ? "Loading..." : "Send"} onPress={queryAgent} disabled={loading} />
{response ? (
<View style={styles.responseContainer}>
<Text style={styles.responseTitle}>Response:</Text>
<Text style={styles.responseText}>{response}</Text>
</View>
) : null}
</View>
);
}
Integrating your AI agent into a web application front-end allows users to interact with it through a familiar interface. This approach works well for customer-facing applications.
For React applications, you can create a reusable AI agent component:
// AIAgentChat.jsx - A React component for interacting with your AI agent
import React, { useState, useEffect, useRef } from 'react';
import './AIAgentChat.css';
const AIAgentChat = ({ apiEndpoint, apiKey, initialContext = {} }) => {
const [messages, setMessages] = useState([]);
const [input, setInput] = useState('');
const [isLoading, setIsLoading] = useState(false);
const messagesEndRef = useRef(null);
// Auto-scroll to bottom of chat
useEffect(() => {
scrollToBottom();
}, [messages]);
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
const handleSubmit = async (e) => {
e.preventDefault();
if (!input.trim()) return;
// Add user message to chat
const userMessage = { role: 'user', content: input };
setMessages(prev => [...prev, userMessage]);
setInput('');
setIsLoading(true);
try {
// Send request to AI agent API
const response = await fetch(apiEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`
},
body: JSON.stringify({
messages: [...messages, userMessage],
context: initialContext
})
});
if (!response.ok) {
throw new Error('API request failed');
}
const data = await response.json();
// Add agent response to chat
setMessages(prev => [...prev, { role: 'assistant', content: data.response }]);
} catch (error) {
console.error('Error querying AI agent:', error);
setMessages(prev => [...prev, {
role: 'system',
content: 'Sorry, I encountered an error. Please try again later.'
}]);
} finally {
setIsLoading(false);
}
};
return (
<div className="ai-agent-chat">
<div className="chat-messages">
{messages.length === 0 ? (
<div className="empty-state">
<p>How can I help you today?</p>
</div>
) : (
messages.map((msg, index) => (
<div key={index} className={`message ${msg.role}`}>
<div className="message-content">{msg.content}</div>
</div>
))
)}
{isLoading && (
<div className="message assistant loading">
<div className="typing-indicator">
<span></span><span></span><span></span>
</div>
</div>
)}
<div ref={messagesEndRef} />
</div>
<form className="chat-input" onSubmit={handleSubmit}>
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type your message..."
disabled={isLoading}
/>
<button type="submit" disabled={isLoading || !input.trim()}>
Send
</button>
</form>
</div>
);
};
export default AIAgentChat;
// AIAgent.vue - A Vue.js component for AI agent integration
<template>
<div class="ai-agent-container">
<div class="agent-header">
<h3>AI Assistant</h3>
<button @click="toggleChat" class="toggle-btn">
{{ isOpen ? 'Close' : 'Open' }}
</button>
</div>
<div v-if="isOpen" class="agent-body">
<div class="messages" ref="messagesContainer">
<div v-for="(message, index) in messages" :key="index"
:class="['message', message.sender]">
<div class="message-content">{{ message.text }}</div>
</div>
<div v-if="isProcessing" class="message agent processing">
<div class="dots">
<span></span><span></span><span></span>
</div>
</div>
</div>
<div class="input-area">
<input
v-model="userInput"
@keyup.enter="sendMessage"
placeholder="Ask me anything..."
:disabled="isProcessing"
/>
<button @click="sendMessage" :disabled="isProcessing || !userInput.trim()">
Send
</button>
</div>
</div>
</div>
</template>
<script>
export default {
name: 'AIAgent',
props: {
apiUrl: {
type: String,
required: true
},
apiKey: {
type: String,
required: true
}
},
data() {
return {
isOpen: false,
messages: [],
userInput: '',
isProcessing: false
}
},
methods: {
toggleChat() {
this.isOpen = !this.isOpen;
if (this.isOpen && this.messages.length === 0) {
this.messages.push({
sender: 'agent',
text: 'Hello! How can I assist you today?'
});
}
},
async sendMessage() {
if (!this.userInput.trim() || this.isProcessing) return;
// Add user message
this.messages.push({
sender: 'user',
text: this.userInput
});
const query = this.userInput;
this.userInput = '';
this.isProcessing = true;
// Scroll to bottom
this.$nextTick(() => {
this.scrollToBottom();
});
try {
const response = await fetch(this.apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`
},
body: JSON.stringify({ query })
});
if (!response.ok) {
throw new Error('Failed to get response');
}
const data = await response.json();
// Add agent response
this.messages.push({
sender: 'agent',
text: data.response
});
} catch (error) {
console.error('Error:', error);
this.messages.push({
sender: 'agent',
text: 'Sorry, I encountered an error. Please try again later.'
});
} finally {
this.isProcessing = false;
this.$nextTick(() => {
this.scrollToBottom();
});
}
},
scrollToBottom() {
const container = this.$refs.messagesContainer;
container.scrollTop = container.scrollHeight;
}
}
}
</script>
For simple integration into any website, you can create an embeddable script that loads your AI agent as a chat widget:
<!-- AI Agent Embed Script -->
<script>
(function(window, document) {
// Configuration
const config = {
apiEndpoint: 'https://your-agent-api.com/api/agent',
apiKey: 'YOUR_PUBLIC_API_KEY',
position: 'bottom-right', // bottom-right, bottom-left, top-right, top-left
initialMessage: 'Hi there! How can I help you today?',
widgetTitle: 'AI Assistant',
primaryColor: '#4F46E5'
};
// Create widget container
const createWidget = () => {
const widget = document.createElement('div');
widget.id = 'ai-assistant-widget';
widget.innerHTML = `
<div class="ai-widget-container" style="position: fixed; ${config.position.includes('bottom') ? 'bottom: 20px;' : 'top: 20px;'} ${config.position.includes('right') ? 'right: 20px;' : 'left: 20px;'} z-index: 9999;">
<div class="ai-widget-chat" style="display: none; width: 350px; height: 450px; background: white; border-radius: 10px; box-shadow: 0 4px 12px rgba(0,0,0,0.15); overflow: hidden; flex-direction: column;">
<div class="ai-widget-header" style="padding: 15px; background: ${config.primaryColor}; color: white;">
<div style="font-weight: bold;">${config.widgetTitle}</div>
<button class="ai-widget-close" style="background: none; border: none; color: white; cursor: pointer;">×</button>
</div>
<div class="ai-widget-messages" style="flex: 1; overflow-y: auto; padding: 15px;"></div>
<div class="ai-widget-input" style="padding: 10px; border-top: 1px solid #eee; display: flex;">
<input type="text" placeholder="Type your message..." style="flex: 1; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
<button style="margin-left: 8px; background: ${config.primaryColor}; color: white; border: none; border-radius: 4px; padding: 8px 12px; cursor: pointer;">Send</button>
</div>
</div>
<button class="ai-widget-button" style="width: 60px; height: 60px; border-radius: 50%; background: ${config.primaryColor}; color: white; border: none; box-shadow: 0 2px 8px rgba(0,0,0,0.15); cursor: pointer; display: flex; align-items: center; justify-content: center;">
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M12 2C6.48 2 2 6.48 2 12C2 17.52 6.48 22 12 22C17.52 22 22 17.52 22 12C22 6.48 17.52 2 12 2ZM13 17H11V15H13V17ZM13 13H11V7H13V13Z" fill="white"/>
</svg>
</button>
</div>
`;
document.body.appendChild(widget);
// Initialize widget behavior
initWidgetBehavior(widget);
};
// Initialize widget behavior
const initWidgetBehavior = (widget) => {
const chatButton = widget.querySelector('.ai-widget-button');
const chatWindow = widget.querySelector('.ai-widget-chat');
const closeButton = widget.querySelector('.ai-widget-close');
const messagesContainer = widget.querySelector('.ai-widget-messages');
const inputField = widget.querySelector('input');
const sendButton = widget.querySelector('.ai-widget-input button');
// Toggle chat window
chatButton.addEventListener('click', () => {
chatWindow.style.display = 'flex';
chatButton.style.display = 'none';
// Add initial message if chat is empty
if (messagesContainer.children.length === 0) {
addMessage('assistant', config.initialMessage);
}
});
// Close chat window
closeButton.addEventListener('click', () => {
chatWindow.style.display = 'none';
chatButton.style.display = 'flex';
});
// Send message
const sendMessage = () => {
const message = inputField.value.trim();
if (!message) return;
addMessage('user', message);
inputField.value = '';
// Call API with message
fetch(config.apiEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${config.apiKey}`
},
body: JSON.stringify({ query: message })
})
.then(response => response.json())
.then(data => {
addMessage('assistant', data.response);
})
.catch(error => {
console.error('Error:', error);
addMessage('assistant', 'Sorry, I encountered an error. Please try again.');
});
};
// Add message to chat
const addMessage = (role, text) => {
const messageEl = document.createElement('div');
messageEl.className = `ai-message ${role}`;
messageEl.style.marginBottom = '10px';
messageEl.style.padding = '8px 12px';
messageEl.style.borderRadius = '18px';
messageEl.style.maxWidth = '80%';
messageEl.style.alignSelf = role === 'user' ? 'flex-end' : 'flex-start';
messageEl.style.background = role === 'user' ? config.primaryColor : '#f1f1f1';
messageEl.style.color = role === 'user' ? 'white' : 'black';
messageEl.textContent = text;
messagesContainer.appendChild(messageEl);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
};
// Event listeners for sending messages
sendButton.addEventListener('click', sendMessage);
inputField.addEventListener('keypress', (e) => {
if (e.key === 'Enter') sendMessage();
});
};
// Initialize when DOM is ready
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', createWidget);
} else {
createWidget();
}
})(window, document);
</script>
Docker containers provide a consistent environment for deploying your AI agent across different platforms and infrastructures.
# Dockerfile for an AI agent
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Set environment variables
ENV MODEL_PATH=/app/models
ENV LOG_LEVEL=INFO
# Command to run the application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
For production deployments, consider using container orchestration tools:
When deploying AI agents, the backend architecture significantly impacts scalability, performance, and maintenance. Here are some common architectural patterns suitable for AI agent deployments.
Breaking down your AI agent system into independent microservices can improve scalability and maintenance:
# message_broker.py - Example RabbitMQ integration for microservices
import pika
import json
import threading
import uuid
class RabbitMQClient:
"""Client for handling async communication between microservices"""
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
self.channel = self.connection.channel()
# Set up response queue
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self._on_response,
auto_ack=True
)
self.responses = {}
self._start_consuming()
def _start_consuming(self):
"""Start consuming messages in a separate thread"""
self.thread = threading.Thread(target=self._consume)
self.thread.daemon = True
self.thread.start()
def _consume(self):
"""Consume messages from RabbitMQ"""
self.channel.start_consuming()
def _on_response(self, ch, method, props, body):
"""Handle incoming responses"""
if props.correlation_id in self.responses:
self.responses[props.correlation_id] = body
def call_service(self, service_queue, message, timeout=30):
"""Call a microservice and wait for the response"""
correlation_id = str(uuid.uuid4())
self.responses[correlation_id] = None
self.channel.basic_publish(
exchange='',
routing_key=service_queue,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=correlation_id,
),
body=json.dumps(message)
)
# Wait for response with timeout
start_time = time.time()
while self.responses[correlation_id] is None:
if time.time() - start_time > timeout:
del self.responses[correlation_id]
raise TimeoutError(f"Service {service_queue} timed out")
time.sleep(0.1)
response = self.responses[correlation_id]
del self.responses[correlation_id]
return json.loads(response)
Event-driven architectures are well-suited for AI agents that need to process and respond to asynchronous events:
Choosing the right database strategy is crucial for maintaining agent state, storing conversation history, and implementing caching strategies.
# conversation_store.py - MongoDB integration for conversation history
from pymongo import MongoClient
from datetime import datetime, timedelta
class ConversationStore:
"""Manages conversation history for AI agents using MongoDB"""
def __init__(self, connection_string):
self.client = MongoClient(connection_string)
self.db = self.client.agent_database
self.conversations = self.db.conversations
# Create TTL index for automatic cleanup of old conversations
self.conversations.create_index("last_updated", expireAfterSeconds=604800) # 7 days
def create_conversation(self, user_id):
"""Create a new conversation"""
conversation_id = str(uuid.uuid4())
conversation = {
"_id": conversation_id,
"user_id": user_id,
"messages": [],
"metadata": {},
"created_at": datetime.utcnow(),
"last_updated": datetime.utcnow()
}
self.conversations.insert_one(conversation)
return conversation_id
def add_message(self, conversation_id, role, content, metadata=None):
"""Add a message to the conversation history"""
message = {
"role": role, # 'user' or 'assistant'
"content": content,
"timestamp": datetime.utcnow(),
"metadata": metadata or {}
}
result = self.conversations.update_one(
{"_id": conversation_id},
{
"$push": {"messages": message},
"$set": {"last_updated": datetime.utcnow()}
}
)
return result.modified_count > 0
def get_conversation_history(self, conversation_id, limit=None):
"""Retrieve conversation history"""
conversation = self.conversations.find_one({"_id": conversation_id})
if not conversation:
return None
messages = conversation["messages"]
if limit:
messages = messages[-limit:]
return messages
def update_metadata(self, conversation_id, metadata):
"""Update conversation metadata"""
result = self.conversations.update_one(
{"_id": conversation_id},
{
"$set": {
"metadata": metadata,
"last_updated": datetime.utcnow()
}
}
)
return result.modified_count > 0
Retrieval-Augmented Generation (RAG) is a common pattern in AI agents that need access to specific knowledge bases:
# knowledge_base.py - Vector database integration for RAG
import pinecone
from sentence_transformers import SentenceTransformer
import os
class KnowledgeBase:
"""Vector database integration for RAG using Pinecone"""
def __init__(self, api_key, environment, index_name):
# Initialize Pinecone
pinecone.init(api_key=api_key, environment=environment)
# Check if index exists, create if it doesn't
if index_name not in pinecone.list_indexes():
pinecone.create_index(name=index_name, dimension=384, metric="cosine")
self.index = pinecone.Index(index_name)
# Initialize embedding model
self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
def add_document(self, doc_id, text, metadata=None):
"""Add a document to the knowledge base"""
# Create embeddings
embedding = self.embedder.encode(text).tolist()
# Upsert into Pinecone
self.index.upsert(vectors=[(doc_id, embedding, metadata or {})])
return doc_id
def query(self, question, top_k=3):
"""Query the knowledge base for relevant documents"""
# Create query embedding
query_embedding = self.embedder.encode(question).tolist()
# Query Pinecone
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
return results["matches"]
def batch_add_documents(self, documents):
"""Add multiple documents at once"""
vectors = []
for doc in documents:
doc_id = doc.get("id", str(uuid.uuid4()))
text = doc["text"]
metadata = doc.get("metadata", {})
# Create embeddings
embedding = self.embedder.encode(text).tolist()
vectors.append((doc_id, embedding, metadata))
# Batch upsert to Pinecone
self.index.upsert(vectors=vectors)
return len(vectors)
Proper monitoring and maintenance ensure your deployed agent remains reliable, secure, and performant over time.
Implement comprehensive logging to troubleshoot issues and understand agent behavior:
import logging
import time
from contextlib import contextmanager
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("agent.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("ai_agent")
@contextmanager
def timer(name):
"""Context manager for timing code execution."""
start = time.time()
yield
elapsed = time.time() - start
logger.info(f"{name} took {elapsed:.2f} seconds")
def process_query(query, user_id):
"""Process a query with timing and logging."""
logger.info(f"Received query from user {user_id}: {query[:50]}...")
try:
with timer("Agent processing"):
# Your agent processing code here
response = agent.run(query)
logger.info(f"Successfully processed query from user {user_id}")
return response
except Exception as e:
logger.error(f"Error processing query from user {user_id}: {str(e)}")
raise
Implement CI/CD pipelines to safely update your agent:
As your AI agent usage grows, implementing proper scaling strategies becomes crucial for maintaining performance and reliability.
Implement a queue-based architecture to handle increased load by adding more worker instances:
# worker.py - Agent worker implementation
import redis
import json
import os
import time
from langchain.agents import initialize_agent, Tool
from langchain.llms import OpenAI
class AgentWorker:
"""Worker process that consumes tasks from Redis queue"""
def __init__(self, redis_url):
self.redis = redis.from_url(redis_url)
self.request_queue = "agent:requests"
self.response_prefix = "agent:response:"
# Initialize your agent
llm = OpenAI(temperature=0)
tools = [
# Your agent's tools here
]
self.agent = initialize_agent(tools, llm, agent="zero-shot-react-description", verbose=True)
def start(self):
"""Start the worker process"""
print("Agent worker started, waiting for tasks...")
while True:
# Block until task is available, timeout after 1 second to allow for graceful shutdown
task_data = self.redis.blpop(self.request_queue, timeout=1)
if task_data is None:
continue
_, task_json = task_data
task = json.loads(task_json)
print(f"Processing task {task['task_id']}")
try:
# Process the task
result = self.agent.run(task['query'])
# Store the result
response = {
"task_id": task["task_id"],
"result": result,
"status": "completed",
"timestamp": time.time()
}
except Exception as e:
# Handle errors
response = {
"task_id": task["task_id"],
"error": str(e),
"status": "failed",
"timestamp": time.time()
}
# Save the response
response_key = f"{self.response_prefix}{task['task_id']}"
self.redis.set(response_key, json.dumps(response))
# Set expiration to avoid memory leaks (24 hours)
self.redis.expire(response_key, 86400)
# Run the worker
if __name__ == "__main__":
redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
worker = AgentWorker(redis_url)
worker.start()
Implement proper load balancing to distribute requests efficiently across multiple agent instances:
# Example NGINX configuration for load balancing agent instances
http {
upstream agent_backend {
least_conn; # Use least connections strategy
server agent1:8000 max_fails=3 fail_timeout=30s;
server agent2:8000 max_fails=3 fail_timeout=30s;
server agent3:8000 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name ai-agent-api.example.com;
location / {
proxy_pass http://agent_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeout settings
proxy_connect_timeout 10s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# Health check endpoint
location /health {
access_log off;
add_header Content-Type text/plain;
return 200 'OK';
}
}
}
Ensuring the security of your AI agent deployment is critical for protecting user data and maintaining trust.
Conduct periodic security assessments to identify and address vulnerabilities:
Managing costs is essential for sustainable AI agent deployments, especially when using commercial LLM APIs.
import hashlib
import json
import redis
class CachedAgent:
def __init__(self, agent, redis_client, cache_ttl=3600):
self.agent = agent
self.redis = redis_client
self.cache_ttl = cache_ttl
def get_cache_key(self, query):
"""Generate a deterministic cache key from the query."""
query_hash = hashlib.md5(query.encode()).hexdigest()
return f"agent_cache:{query_hash}"
def run(self, query):
"""Run the agent with caching."""
cache_key = self.get_cache_key(query)
# Try to get from cache first
cached_response = self.redis.get(cache_key)
if cached_response:
return json.loads(cached_response)
# If not in cache, run the agent
response = self.agent.run(query)
# Store in cache
self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps(response)
)
return response
A medium-sized e-commerce company deployed an AI agent to augment their customer support team.
A research institution deployed an AI agent to help researchers quickly access and analyze scientific literature.
Implement robust authentication and data protection
Track performance, errors, and user satisfaction
Use caching and efficient model selection
Design your architecture for future growth
Essential tools and references for AI agent deployment