agents/app/rag_system_with_agents.py

703 lines
28 KiB
Python
Raw Permalink Normal View History

2024-06-09 13:45:19 -04:00
import os
import openai
import uuid
import requests
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
from qdrant_client import QdrantClient
from qdrant_client.http import models
from email.mime.text import MIMEText
from airflow_client.client import ApiClient, Configuration
from airflow_client.client.api.dag_run_api import DAGRunApi
import smtplib
from threading import Thread
import time
import json
# Initialize Flask app
app = Flask(__name__, static_folder='./frontend', static_url_path='/')
CORS(app)
# Configure OpenAI API
openai.api_key = os.getenv('OPENAI_API_KEY')
# Configure Qdrant
qdrant = QdrantClient(host=os.getenv('QDRANT_HOST'))
# Dictionary to store the status and progress of tasks
tasks_status = {}
def embed_text(text):
response = openai.Embedding.create(
input=text,
model="text-embedding-ada-002"
)
embedding = response['data'][0]['embedding']
return embedding
def query_qdrant(embedding, top_n=5):
search_result = qdrant.search(
collection_name="rag",
query_vector=embedding,
limit=top_n
)
return search_result
def parse_react_response(response):
steps = []
final_answer = ""
lines = response.split('\n')
for line in lines:
if line.startswith("["):
steps.append(line.strip())
elif line.startswith("Final Answer:"):
final_answer = line.split(":")[1].strip()
return steps, final_answer
def update_task_status(task_id, status, step=None, results=[]):
if task_id not in tasks_status:
tasks_status[task_id] = {"status": status, "current_step": step, "steps": [], "results": []}
else:
tasks_status[task_id]["status"] = status
if step:
tasks_status[task_id]["current_step"] = step
tasks_status[task_id]["steps"].append(step)
tasks_status[task_id]["results"] = results
def process_steps(steps, task_id, memory, results):
try:
for step in steps:
if "[" in step and "]" in step:
agent = step.split("[")[1].split("]")[0].strip().lower().replace(" ", "_")
task = step.split("]")[1].strip()
result = run_agent(agent, task, task_id, memory)
if isinstance(result, tuple):
result = result[0]
results.append(result["message"])
update_task_status(task_id, "processing", step, results)
memory[agent] = result["message"] # Store the result in memory
update_task_status(task_id, "completed", None, results)
except Exception as e:
update_task_status(task_id, f"failed: {e}")
print(f"Error processing steps: {e}")
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if file and file.filename.endswith('.txt'):
content = file.read().decode('utf-8')
embedding = embed_text(content)
document_id = str(uuid.uuid4()) # Generate a UUID for the document ID
qdrant.upsert(
collection_name='rag',
points=[models.PointStruct(id=document_id, vector=embedding, payload={"content": content})]
)
return jsonify({"message": "File uploaded and embedded successfully"}), 200
else:
return jsonify({"error": "Invalid file type. Only .txt files are allowed"}), 400
@app.route('/query', methods=['POST'])
def query():
data = request.json
query_text = data['query']
embedding = embed_text(query_text)
results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in results]
return jsonify({"results": sources})
@app.route('/react_query', methods=['POST'])
def react_query():
data = request.json
query_text = data['query']
task_id = str(uuid.uuid4())
update_task_status(task_id, "initialized")
# Create the system prompt with capabilities
system_prompt = f"""
You are a research assistant that can perform the following tasks:
1. Research Paper Finder
2. Citation Generator
3. Data Summarizer
4. Question Answering
5. Statistical Analysis
6. Graph Generator
7. Keyword Extractor
8. Research Outline Generator
9. Hypothesis Generator
10. Methodology Advisor
11. Experimental Design Helper
12. Survey Designer
13. Plagiarism Checker
14. Grammar and Style Checker
15. Literature Review Organizer
16. Data Cleaning Agent
17. Bibliography Manager
18. Thesis Statement Generator
19. Funding Finder
20. Conference Finder
21. Web Scraper
22. API Integrator
23. Email Notifier
24. File Converter
25. Translation Agent
26. OCR Agent
27. Scheduler
28. Weather Information Agent
Using the ReAct (Reason and Act) paradigm, analyze the following query and determine the steps to answer it. Each step should indicate the agent to use and the task to perform in a structured format, clearly separated by new lines. Make sure to include the agent name in square brackets. Example format: [Agent] Task.
Query: {query_text}
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": query_text}
],
max_tokens=500
)
react_response = response['choices'][0]['message']['content'].strip()
steps, final_answer = parse_react_response(react_response)
if not steps:
update_task_status(task_id, "failed")
return jsonify({"error": "No steps generated by the ReAct system"}), 400
update_task_status(task_id, "processing", steps[0])
results = []
memory = {}
Thread(target=process_steps, args=(steps, task_id, memory, results)).start()
return jsonify({"steps": steps, "task_id": task_id})
def run_agent(agent, query_text, task_id, memory):
# Here we call the appropriate agent function based on the agent type
if agent == 'research_paper_finder':
return research_paper_finder(query_text, memory)
elif agent == 'citation_generator':
return citation_generator(query_text, memory)
elif agent == 'data_summarizer':
return data_summarizer(query_text, memory)
elif agent == 'question_answering':
return question_answering(query_text, memory)
elif agent == 'statistical_analysis':
return statistical_analysis(query_text, memory)
elif agent == 'graph_generator':
return graph_generator(query_text, memory)
elif agent == 'keyword_extractor':
return keyword_extractor(query_text, memory)
elif agent == 'research_outline_generator':
return research_outline_generator(query_text, memory)
elif agent == 'hypothesis_generator':
return hypothesis_generator(query_text, memory)
elif agent == 'methodology_advisor':
return methodology_advisor(query_text, memory)
elif agent == 'experimental_design_helper':
return experimental_design_helper(query_text, memory)
elif agent == 'survey_designer':
return survey_designer(query_text, memory)
elif agent == 'plagiarism_checker':
return plagiarism_checker(query_text, memory)
elif agent == 'grammar_and_style_checker':
return grammar_and_style_checker(query_text, memory)
elif agent == 'literature_review_organizer':
return literature_review_organizer(query_text, memory)
elif agent == 'data_cleaning_agent':
return data_cleaning_agent(query_text, memory)
elif agent == 'bibliography_manager':
return bibliography_manager(query_text, memory)
elif agent == 'thesis_statement_generator':
return thesis_statement_generator(query_text, memory)
elif agent == 'funding_finder':
return funding_finder(query_text, memory)
elif agent == 'conference_finder':
return conference_finder(query_text, memory)
elif agent == 'web_scraper_using_scrapyd' or 'web_scraper':
return web_scraper(query_text, memory)
elif agent == 'api_integrator':
return api_integrator(query_text, memory)
elif agent == 'email_notifier':
return email_notifier(query_text, memory)
elif agent == 'file_converter':
return file_converter(query_text, memory)
elif agent == 'translation_agent':
return translation_agent(query_text, memory)
elif agent == 'ocr_agent':
return ocr_agent(query_text, memory)
elif agent == 'scheduler':
return scheduler(query_text, memory)
elif agent == 'weather_information_agent':
return weather_information_agent(query_text, memory)
elif agent == 'currency_converter':
return currency_converter(query_text, memory)
elif agent == 'news_aggregator':
return news_aggregator(query_text, memory)
else:
return {"message": f"Unknown agent: {agent}"}
def research_paper_finder(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Find research papers related to: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def citation_generator(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Generate a citation for: {query_text}"}
],
max_tokens=50
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def data_summarizer(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Summarize the following text: {query_text}"}
],
max_tokens=1000
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def question_answering(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Answer the following question: {query_text}"}
],
max_tokens=100
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def statistical_analysis(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Perform statistical analysis on the following data: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def graph_generator(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Generate a graph for the following data: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def keyword_extractor(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Extract keywords from the following text: {query_text}"}
],
max_tokens=50
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def research_outline_generator(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Generate a research outline for: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def hypothesis_generator(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Generate a hypothesis based on the following topic: {query_text}"}
],
max_tokens=100
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def methodology_advisor(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Suggest a methodology for the following research topic: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def experimental_design_helper(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Help design an experiment for: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def survey_designer(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Design a survey for: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def plagiarism_checker(query_text, memory):
return {"message": "Plagiarism check is not implemented yet.", "query": query_text}
def grammar_and_style_checker(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Check and correct the grammar and style of the following text: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def literature_review_organizer(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Organize the following literature review: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def data_cleaning_agent(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Clean the following data: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def bibliography_manager(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Manage the bibliography for: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def thesis_statement_generator(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Generate a thesis statement for: {query_text}"}
],
max_tokens=100
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def funding_finder(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Find funding opportunities for: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def conference_finder(query_text, memory):
embedding = embed_text(query_text)
rag_results = query_qdrant(embedding)
sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results]
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Find conferences related to: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def web_scraper(query_text, memory):
project_name = 'my_project'
spider_name = 'my_spider'
scrapyd_host = os.getenv('SCRAPYD_HOST', 'localhost')
data = {
'project': project_name,
'spider': spider_name,
'start_urls': query_text
}
try:
response = requests.post(f'http://{scrapyd_host}:6800/schedule.json', data=data)
# if response.status_code == "200":
job_id = response.json().get('jobid')
# Wait for the job to finish and fetch the results
time.sleep(15) # Adjust this sleep time as needed
items_response = requests.get(f'http://{scrapyd_host}:6800/items/{project_name}/{spider_name}/{job_id}.jl')
#if items_response.status_code == 200:
items = [json.loads(line) for line in items_response.text.splitlines()]
# for item in items:
# Insert each scraped item into Qdrant
content = items[0].get('content', '')
embedding = embed_text(content)
document_id = str(uuid.uuid4())
qdrant.upsert(
collection_name='rag',
points=[models.PointStruct(id=document_id, vector=embedding, payload={"content": content})]
)
return {"message": content}
# return {"message": f"Job completed with {len(items)} items scraped", "items": items}
# else:
# return {"message": "Failed to fetch scraped items"}, 500
#else:
# return {"message": "Failed to schedule job"}, 500
except Exception as e:
print(f"Error scheduling scrapy job: {e}")
return {"message": f"Failed to schedule job - {e}"}, 500
def api_integrator(query_text, memory):
response = requests.post(
'http://localhost:1880/api_integrator',
json={'query': query_text}
)
return {"message": response.json(), "query": query_text}
def email_notifier(query_text, memory):
msg = MIMEText(query_text)
msg['Subject'] = 'Notification'
msg['From'] = 'test@example.com'
msg['To'] = 'mahesh.kommareddi@gmail.com'
with smtplib.SMTP('mailhog', 1025) as server:
server.sendmail(msg['From'], [msg['To']], msg.as_string())
return {"message": "Email sent successfully"}
def file_converter(query_text, memory):
response = requests.post(
'http://libreoffice:8084/convert',
files={'file': query_text}
)
return {"message": "File conversion completed", "data": response.json()}
def translation_agent(query_text, memory):
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"The previous response relating to the query was: {memory}"},
{"role": "user", "content": f"Translate the following text: {query_text}"}
],
max_tokens=150
)
response_message = response['choices'][0]['message']['content'].strip()
return {"message": response_message, "sources": sources}
def ocr_agent(query_text, memory):
response = requests.post(
'http://localhost:8081/ocr',
files={'file': query_text}
)
return {"message": response.json(), "query": query_text}
def scheduler(query_text, memory):
configuration = Configuration(
host="http://localhost:8082/api/v1"
)
api_client = ApiClient(configuration)
dag_run_api = DAGRunApi(api_client)
dag_id = 'example_dag'
dag_run = dag_run_api.post_dag_run(
dag_id=dag_id,
dag_run={"conf": {"query_text": query_text}}
)
return {"message": f"Scheduled task for {query_text}", "dag_run_id": dag_run.dag_run_id}
def weather_information_agent(query_text, memory):
api_key = os.getenv('OPENWEATHERMAP_API_KEY')
response = requests.get(
f'http://api.openweathermap.org/data/2.5/weather?q={query_text}&appid={api_key}'
)
return {"message": response.json(), "query": query_text}
@app.route('/ocr', methods=['POST'])
def handle_ocr():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
response = requests.post(
'http://localhost:8081/ocr',
files={'file': file}
)
return jsonify(response.json())
@app.route('/schedule', methods=['POST'])
def handle_schedule():
data = request.json
query_text = data['query']
return jsonify(scheduler(query_text))
@app.route('/weather', methods=['POST'])
def handle_weather():
data = request.json
query_text = data['query']
return jsonify(weather_information_agent(query_text))
@app.route('/scrape', methods=['POST'])
def handle_scrape():
data = request.json
query_text = data['query']
return web_scraper(query_text, {})
@app.route('/integrate', methods=['POST'])
def handle_integrate():
data = request.json
query_text = data['query']
return jsonify(api_integrator(query_text))
@app.route('/notify', methods=['POST'])
def handle_notify():
data = request.json
query_text = data['query']
return jsonify(email_notifier(query_text))
@app.route('/convert', methods=['POST'])
def handle_convert():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
response = requests.post(
'http://localhost:8084/convert',
files={'file': file}
)
return jsonify(response.json())
@app.route('/')
def serve_index():
return send_from_directory(app.static_folder, 'index.html')
@app.route('/status/<task_id>', methods=['GET'])
def get_status(task_id):
return jsonify(tasks_status.get(task_id, {"error": "Task ID not found"}))
if __name__ == '__main__':
app.run(host='0.0.0.0', port=1337)