core-crew/tasks.py
Mahesh Kommareddi fc2c279836 Various updates
2024-06-15 13:41:25 -04:00

144 lines
5.7 KiB
Python

from celery import Celery
from crewai import Agent, Task, Crew, tasks
from crewai_tools import SerperDevTool, ScrapeWebsiteTool, SeleniumScrapingTool
from langchain_openai import ChatOpenAI
from pymongo import MongoClient
import langchain_core
import html
import os
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = MongoClient("mongodb+srv://maheshkommareddi:Yu2L6pQKyJgcTb9a@cluster0.qadl40g.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0")
db = client.content_generation
# Initialize Celery
app = Celery('tasks', broker='amqp://guest:guest@localhost:5672//')
# Load environment variables
os.environ["OPENAI_API_KEY"] = "sk-kkk"
os.environ["OPENAI_MODEL_NAME"] = "anthropic.claude-3-sonnet-20240229-v1:0"
os.environ["OPENAI_API_BASE"] = "http://chat.the.mk:1337"
# Initialize LLM
llm = ChatOpenAI(
model="anthropic.claude-3-sonnet-20240229-v1:0",
base_url="http://chat.the.mk:1337",
temperature=0.1
)
# Load tools
search_tool = SerperDevTool()
scrape_tool = SeleniumScrapingTool()
scrape_tool_bare = ScrapeWebsiteTool()
# Function to update task status in MongoDB
def update_task_status(task_id, status, message):
try:
db.task_updates.insert_one({"task_id": task_id, "status": status, "message": message})
logger.info(f"Updated task status: {task_id}, {status}, {message}")
except Exception as e:
logger.error(f"Error updating task status: {e}")
# Define tasks
@app.task
def generate_content(agenda):
def researcher_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
update_task_status(app.current_task.request.id, f"researcher", "Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
update_task_status(app.current_task.request.id, f"researcher", output[0][0].log)
def writer_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
update_task_status(app.current_task.request.id, f"writer", "Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
update_task_status(app.current_task.request.id, f"writer", output[0][0].log)
researcher = Agent(
role='Senior Research Analyst',
goal='Find way to explain ' + agenda,
backstory=(
"You are a Senior Research Analyst at a leading tech think tank."
f"Your expertise lies in identifying {agenda}."
"You have a knack for dissecting complex data and presenting actionable insights."
"Always search the web first and make the determination for the best 4 Links, but exclude PDFs"
"For any web searches, be sure to scrape the website content from the Link in the search"
),
verbose=True,
allow_delegation=False,
tools=[search_tool, scrape_tool, scrape_tool_bare],
max_rpm=100,
step_callback=researcher_callback
)
writer = Agent(
role='Tech Content Strategist',
goal='Craft compelling content on ' + agenda,
backstory=(
"You are a renowned Tech Content Strategist, known for your insightful and engaging articles on science and innovation."
"With a deep understanding of the tech industry, you transform complex concepts into compelling narratives."
"For any web searches, be sure to scrape the website content from the Link in the search, but exclude PDFs"
),
verbose=True,
allow_delegation=True,
tools=[search_tool, scrape_tool, scrape_tool_bare],
cache=False, # Disable cache for this agent
step_callback=writer_callback
)
task1 = Task(
description=(
f"Conduct a comprehensive analysis of the latest in {agenda}"
"Identify key trends, breakthrough technologies, and potential industry impacts."
"Compile your findings in a detailed report and include references and links to the source material."
),
expected_output=f"A comprehensive full report on {agenda} in 2024, leave nothing out",
agent=researcher,
)
task2 = Task(
description=(
f"Using the insights from the researcher's report, develop an engaging blog post that highlights the most significant {agenda} ideas."
"Your post should be informative yet accessible, catering to a tech-savvy audience."
"Aim for a narrative that captures the essence of these breakthroughs and their implications for the future."
"Keep asking for research and revise until the minimum 5000 words are met"
"Include the research in the entirety along with the resulting report"
"Include at least five links to external pages or PDFs with an appropriate anchor tag in the final report"
),
expected_output=f"A compelling ten paragraphs blog post formatted as html to place inside the body tag with headings, subheadings, and a main thesis about the latest {agenda}",
agent=writer
)
# Define the main callback for the crew
def main_callback(output: tasks.task_output.TaskOutput):
update_task_status(app.current_task.request.id, 1, output.description)
# Instantiate your crew with a sequential process
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
verbose=4,
task_callback=main_callback
)
result = crew.kickoff()
# Save the result to MongoDB Atlas
content = {
"agenda": agenda,
"report": html.unescape(result),
"blog_post": html.unescape(result)
}
db.content.insert_one(content)
return result