core-crew/tasks.py
2024-06-15 17:32:58 -04:00

417 lines
16 KiB
Python

from celery import Celery
from crewai import Process, Agent, Task, Crew, tasks
from crewai_tools import SerperDevTool, ScrapeWebsiteTool, SeleniumScrapingTool
from crewai_tools import BaseTool
from tools import MockTool
from langchain_openai import ChatOpenAI
from pymongo import MongoClient
import langchain_core
import html
import os
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = MongoClient("mongodb+srv://maheshkommareddi:Yu2L6pQKyJgcTb9a@cluster0.qadl40g.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0")
db = client.content_generation
# Initialize Celery
app = Celery('tasks', broker='amqp://guest:guest@localhost:5672//')
# Load environment variables
os.environ["OPENAI_API_KEY"] = "sk-kkk"
os.environ["OPENAI_MODEL_NAME"] = "anthropic.claude-3-sonnet-20240229-v1:0"
os.environ["OPENAI_API_BASE"] = "http://chat.the.mk:1337"
# Initialize LLM
llm = ChatOpenAI(
model="anthropic.claude-3-sonnet-20240229-v1:0",
base_url="http://chat.the.mk:1337",
temperature=0.1
)
# Example usage
order_details = {
'customer_id': 1234,
'items': [
{'product_id': 'ABC123', 'quantity': 2},
{'product_id': 'XYZ456', 'quantity': 1}
]
}
customer_inquiry = {
'customer_id': 5678,
'issue': 'Order status inquiry'
}
shipping_details = {
'order_id': 9876,
'shipping_address': '123 Main St, Anytown USA'
}
order = {
'order_details': order_details,
'customer_inquiry': customer_inquiry,
'shipping_details': shipping_details
}
# Load tools
search_tool = SerperDevTool()
scrape_tool = SeleniumScrapingTool()
scrape_tool_bare = ScrapeWebsiteTool()
order_management_tool = MockTool()
order_management_tool.set_name("Order Management")
inventory_tool = MockTool()
inventory_tool.set_name("Inventory")
customer_support_tool = MockTool()
customer_support_tool.set_name("Customer Support")
order_tracking_tool = MockTool()
order_tracking_tool.set_name("Order Tracking")
supplier_management_tool = MockTool()
supplier_management_tool.set_name("Supplier Management")
shipping_tool = MockTool()
shipping_tool.set_name("Shipping")
tracking_tool = MockTool()
tracking_tool.set_name("Tracking")
# Callback functions
def order_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Order Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
def customer_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Customer Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
def inventory_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Inventory Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
def logistics_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Logistics Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
# Define the Order Agent
order_agent = Agent(
role='Order Manager',
goal='Manage and process customer orders efficiently',
backstory=(
"You are the Order Manager at an e-commerce company."
"Your responsibilities include receiving and processing customer orders,"
"updating inventory levels, and coordinating with the logistics team for shipping."
"You have access to the company's order management system and inventory database."
),
tools=[order_management_tool, inventory_tool],
max_rpm=100,
step_callback=order_callback
)
order_task = Task(
description=(
"Process the incoming customer order."
"Update the inventory levels accordingly."
"Coordinate with the logistics team for shipping."
),
expected_output="Order processed successfully, inventory updated, and shipping coordinated.",
agent=order_agent
)
# Define the Customer Agent
customer_agent = Agent(
role='Customer Service Representative',
goal='Provide excellent customer support and resolve inquiries',
backstory=(
"You are a Customer Service Representative at an e-commerce company."
"Your role is to assist customers with their questions, concerns, and complaints."
"You have access to customer order history, product information, and company policies."
),
tools=[customer_support_tool, order_tracking_tool],
max_rpm=100,
step_callback=customer_callback
)
customer_task = Task(
description=(
"Respond to the customer inquiry or complaint."
"Provide relevant information, track order status, or escalate the issue if necessary."
),
expected_output="Customer inquiry resolved or escalated appropriately.",
agent=customer_agent
)
# Define the Inventory Agent
inventory_agent = Agent(
role='Inventory Manager',
goal='Maintain accurate inventory levels and optimize stock',
backstory=(
"You are the Inventory Manager at an e-commerce company."
"Your responsibilities include monitoring inventory levels, reordering products,"
"and ensuring efficient stock management."
"You have access to the company's inventory database and supplier information."
),
tools=[inventory_tool, supplier_management_tool],
max_rpm=100,
step_callback=inventory_callback
)
inventory_task = Task(
description=(
"Monitor inventory levels and reorder products as needed."
"Coordinate with suppliers for timely restocking."
"Optimize stock levels based on demand and sales data."
),
expected_output="Inventory levels optimized, and restocking coordinated with suppliers.",
agent=inventory_agent
)
# Define the Logistics Agent
logistics_agent = Agent(
role='Logistics Coordinator',
goal='Ensure efficient and timely shipping of orders',
backstory=(
"You are the Logistics Coordinator at an e-commerce company."
"Your role is to manage the shipping process, coordinate with carriers,"
"and track shipments to ensure timely delivery."
"You have access to the company's order management system and carrier integrations."
),
tools=[shipping_tool, tracking_tool],
max_rpm=100,
step_callback=logistics_callback
)
logistics_task = Task(
description=(
"Coordinate the shipping of the processed order."
"Select the appropriate carrier and shipping method."
"Track the shipment and provide updates to the customer."
),
expected_output="Order shipped successfully, and tracking information provided to the customer.",
agent=logistics_agent
)
# Update the Company class
class Company:
def __init__(self):
self.order_agent = order_agent
self.customer_agent = customer_agent
self.inventory_agent = inventory_agent
self.logistics_agent = logistics_agent
self.order_crew = Crew(agents=[order_agent], tasks=[order_task])
self.customer_crew = Crew(agents=[customer_agent], tasks=[customer_task])
self.inventory_crew = Crew(agents=[inventory_agent], tasks=[inventory_task])
self.logistics_crew = Crew(agents=[logistics_agent], tasks=[logistics_task])
def process_order(self, order):
# Execute the order crew with the provided order details
order_result = self.order_crew.kickoff(order_task, order)
return order_result
def handle_customer_inquiry(self, inquiry):
# Execute the customer crew with the provided inquiry details
inquiry_result = self.customer_crew.kickoff(customer_task, inquiry)
return inquiry_result
def manage_inventory(self):
# Execute the inventory crew
inventory_result = self.inventory_crew.kickoff(inventory_task)
return inventory_result
def ship_order(self, order):
# Execute the logistics crew with the provided order details
shipping_result = self.logistics_crew.kickoff(logistics_task, order)
return shipping_result
company = Company()
class ManagerTool(BaseTool):
name: str | None = "Manager Tool"
description: str | None = "A tool for the Manager Agent to coordinate other agents and handle business processes."
# name = "Manager Tool"
# description = "A tool for the Manager Agent to coordinate other agents and handle business processes."
def _run(self, **kwargs) -> str:
# Parse the argument to determine the requested action
if len(kwargs) > 0:
action = kwargs[0]
if action == "process_order":
order_details = kwargs.get('order_details', order['order_details'])
result = company.process_order(order_details)
elif action == "handle_inquiry":
customer_inquiry = kwargs.get('customer_inquiry', order['customer_inquiry'])
result = company.handle_customer_inquiry(customer_inquiry)
elif action == "manage_inventory":
result = company.manage_inventory()
elif action == "ship_order":
shipping_details = kwargs.get('shipping_details', order['shipping_details'])
result = company.ship_order(shipping_details)
else:
result = f"The orders and status is: {order}"
return str(result)
return f"I understand. I have done what is necessary."
# Define the Manager Agent
manager_agent = Agent(
role="Business Operations Manager",
goal="Coordinate and manage business processes efficiently",
backstory=(
"You are the Business Operations Manager at an e-commerce company."
"Your role is to oversee and coordinate various business processes,"
"including order processing, customer support, inventory management, and logistics."
"You have access to a tool that allows you to delegate tasks to specialized agents."
"The agents you can delegate to are order_agent, customer_agent, inventory_agent, and logistics_agent"
),
tools=[ManagerTool()],
max_rpm=100,
)
manager_task = Task(
description="Manage and coordinate the appropriate business processes based on the given input.",
expected_output="Business processes executed successfully.",
agent=manager_agent,
)
project_crew = Crew(
agents=[manager_agent, order_agent, customer_agent, inventory_agent, logistics_agent],
tasks=[manager_task, order_task, customer_task, inventory_task, logistics_task],
manager_llm=ChatOpenAI(
model="amazon-embeddings",
base_url="http://chat.the.mk:1337",
temperature=0.1
), # Mandatory for hierarchical process
process=Process.hierarchical, # Specifies the hierarchical management approach
# memory=True, # Enable memory usage for enhanced task execution
)
# Pass the order_details, customer_inquiry, and shipping_details as arguments to the kickoff method
project_crew.kickoff()
# Function to update task status in MongoDB
def update_task_status(task_id, status, message):
try:
db.task_updates.insert_one({"task_id": task_id, "status": status, "message": message})
logger.info(f"Updated task status: {task_id}, {status}, {message}")
except Exception as e:
logger.error(f"Error updating task status: {e}")
# Define tasks
@app.task
def generate_content(agenda):
def researcher_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
update_task_status(app.current_task.request.id, f"researcher", "Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
update_task_status(app.current_task.request.id, f"researcher", output[0][0].log)
def writer_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
update_task_status(app.current_task.request.id, f"writer", "Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
update_task_status(app.current_task.request.id, f"writer", output[0][0].log)
researcher = Agent(
role='Senior Research Analyst',
goal='Find way to explain ' + agenda,
backstory=(
"You are a Senior Research Analyst at a leading tech think tank."
f"Your expertise lies in identifying {agenda}."
"You have a knack for dissecting complex data and presenting actionable insights."
"Always search the web first and make the determination for the best 4 Links, but exclude PDFs"
"For any web searches, be sure to scrape the website content from the Link in the search"
),
verbose=True,
allow_delegation=False,
tools=[search_tool, scrape_tool, scrape_tool_bare],
max_rpm=100,
step_callback=researcher_callback
)
writer = Agent(
role='Tech Content Strategist',
goal='Craft compelling content on ' + agenda,
backstory=(
"You are a renowned Tech Content Strategist, known for your insightful and engaging articles on science and innovation."
"With a deep understanding of the tech industry, you transform complex concepts into compelling narratives."
"For any web searches, be sure to scrape the website content from the Link in the search, but exclude PDFs"
),
verbose=True,
allow_delegation=True,
tools=[search_tool, scrape_tool, scrape_tool_bare],
cache=False, # Disable cache for this agent
step_callback=writer_callback
)
task1 = Task(
description=(
f"Conduct a comprehensive analysis of the latest in {agenda}"
"Identify key trends, breakthrough technologies, and potential industry impacts."
"Compile your findings in a detailed report and include references and links to the source material."
),
expected_output=f"A comprehensive full report on {agenda} in 2024, leave nothing out",
agent=researcher,
)
task2 = Task(
description=(
f"Using the insights from the researcher's report, develop an engaging blog post that highlights the most significant {agenda} ideas."
"Your post should be informative yet accessible, catering to a tech-savvy audience."
"Aim for a narrative that captures the essence of these breakthroughs and their implications for the future."
"Keep asking for research and revise until the minimum 5000 words are met"
"Include the research in the entirety along with the resulting report"
"Include at least five links to external pages or PDFs with an appropriate anchor tag in the final report"
),
expected_output=f"A compelling ten paragraphs blog post formatted as html to place inside the body tag with headings, subheadings, and a main thesis about the latest {agenda}",
agent=writer
)
# Define the main callback for the crew
def main_callback(output: tasks.task_output.TaskOutput):
update_task_status(app.current_task.request.id, 1, output.description)
# Instantiate your crew with a sequential process
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
verbose=4,
task_callback=main_callback
)
result = crew.kickoff()
# Save the result to MongoDB Atlas
content = {
"agenda": agenda,
"report": html.unescape(result),
"blog_post": html.unescape(result)
}
db.content.insert_one(content)
return result