import os import openai import uuid import requests from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS from qdrant_client import QdrantClient from qdrant_client.http import models from email.mime.text import MIMEText from airflow_client.client import ApiClient, Configuration from airflow_client.client.api.dag_run_api import DAGRunApi import smtplib from threading import Thread import time import json # Initialize Flask app app = Flask(__name__, static_folder='./frontend', static_url_path='/') CORS(app) # Configure OpenAI API openai.api_key = os.getenv('OPENAI_API_KEY') # Configure Qdrant qdrant = QdrantClient(host=os.getenv('QDRANT_HOST')) # Dictionary to store the status and progress of tasks tasks_status = {} def embed_text(text): response = openai.Embedding.create( input=text, model="text-embedding-ada-002" ) embedding = response['data'][0]['embedding'] return embedding def query_qdrant(embedding, top_n=5): search_result = qdrant.search( collection_name="rag", query_vector=embedding, limit=top_n ) return search_result def parse_react_response(response): steps = [] final_answer = "" lines = response.split('\n') for line in lines: if line.startswith("["): steps.append(line.strip()) elif line.startswith("Final Answer:"): final_answer = line.split(":")[1].strip() return steps, final_answer def update_task_status(task_id, status, step=None, results=[]): if task_id not in tasks_status: tasks_status[task_id] = {"status": status, "current_step": step, "steps": [], "results": []} else: tasks_status[task_id]["status"] = status if step: tasks_status[task_id]["current_step"] = step tasks_status[task_id]["steps"].append(step) tasks_status[task_id]["results"] = results def process_steps(steps, task_id, memory, results): try: for step in steps: if "[" in step and "]" in step: agent = step.split("[")[1].split("]")[0].strip().lower().replace(" ", "_") task = step.split("]")[1].strip() result = run_agent(agent, task, task_id, memory) if isinstance(result, tuple): result = result[0] results.append(result["message"]) update_task_status(task_id, "processing", step, results) memory[agent] = result["message"] # Store the result in memory update_task_status(task_id, "completed", None, results) except Exception as e: update_task_status(task_id, f"failed: {e}") print(f"Error processing steps: {e}") @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 if file and file.filename.endswith('.txt'): content = file.read().decode('utf-8') embedding = embed_text(content) document_id = str(uuid.uuid4()) # Generate a UUID for the document ID qdrant.upsert( collection_name='rag', points=[models.PointStruct(id=document_id, vector=embedding, payload={"content": content})] ) return jsonify({"message": "File uploaded and embedded successfully"}), 200 else: return jsonify({"error": "Invalid file type. Only .txt files are allowed"}), 400 @app.route('/query', methods=['POST']) def query(): data = request.json query_text = data['query'] embedding = embed_text(query_text) results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in results] return jsonify({"results": sources}) @app.route('/react_query', methods=['POST']) def react_query(): data = request.json query_text = data['query'] task_id = str(uuid.uuid4()) update_task_status(task_id, "initialized") # Create the system prompt with capabilities system_prompt = f""" You are a research assistant that can perform the following tasks: 1. Research Paper Finder 2. Citation Generator 3. Data Summarizer 4. Question Answering 5. Statistical Analysis 6. Graph Generator 7. Keyword Extractor 8. Research Outline Generator 9. Hypothesis Generator 10. Methodology Advisor 11. Experimental Design Helper 12. Survey Designer 13. Plagiarism Checker 14. Grammar and Style Checker 15. Literature Review Organizer 16. Data Cleaning Agent 17. Bibliography Manager 18. Thesis Statement Generator 19. Funding Finder 20. Conference Finder 21. Web Scraper 22. API Integrator 23. Email Notifier 24. File Converter 25. Translation Agent 26. OCR Agent 27. Scheduler 28. Weather Information Agent Using the ReAct (Reason and Act) paradigm, analyze the following query and determine the steps to answer it. Each step should indicate the agent to use and the task to perform in a structured format, clearly separated by new lines. Make sure to include the agent name in square brackets. Example format: [Agent] Task. Query: {query_text} """ response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": query_text} ], max_tokens=500 ) react_response = response['choices'][0]['message']['content'].strip() steps, final_answer = parse_react_response(react_response) if not steps: update_task_status(task_id, "failed") return jsonify({"error": "No steps generated by the ReAct system"}), 400 update_task_status(task_id, "processing", steps[0]) results = [] memory = {} Thread(target=process_steps, args=(steps, task_id, memory, results)).start() return jsonify({"steps": steps, "task_id": task_id}) def run_agent(agent, query_text, task_id, memory): # Here we call the appropriate agent function based on the agent type if agent == 'research_paper_finder': return research_paper_finder(query_text, memory) elif agent == 'citation_generator': return citation_generator(query_text, memory) elif agent == 'data_summarizer': return data_summarizer(query_text, memory) elif agent == 'question_answering': return question_answering(query_text, memory) elif agent == 'statistical_analysis': return statistical_analysis(query_text, memory) elif agent == 'graph_generator': return graph_generator(query_text, memory) elif agent == 'keyword_extractor': return keyword_extractor(query_text, memory) elif agent == 'research_outline_generator': return research_outline_generator(query_text, memory) elif agent == 'hypothesis_generator': return hypothesis_generator(query_text, memory) elif agent == 'methodology_advisor': return methodology_advisor(query_text, memory) elif agent == 'experimental_design_helper': return experimental_design_helper(query_text, memory) elif agent == 'survey_designer': return survey_designer(query_text, memory) elif agent == 'plagiarism_checker': return plagiarism_checker(query_text, memory) elif agent == 'grammar_and_style_checker': return grammar_and_style_checker(query_text, memory) elif agent == 'literature_review_organizer': return literature_review_organizer(query_text, memory) elif agent == 'data_cleaning_agent': return data_cleaning_agent(query_text, memory) elif agent == 'bibliography_manager': return bibliography_manager(query_text, memory) elif agent == 'thesis_statement_generator': return thesis_statement_generator(query_text, memory) elif agent == 'funding_finder': return funding_finder(query_text, memory) elif agent == 'conference_finder': return conference_finder(query_text, memory) elif agent == 'web_scraper_using_scrapyd' or 'web_scraper': return web_scraper(query_text, memory) elif agent == 'api_integrator': return api_integrator(query_text, memory) elif agent == 'email_notifier': return email_notifier(query_text, memory) elif agent == 'file_converter': return file_converter(query_text, memory) elif agent == 'translation_agent': return translation_agent(query_text, memory) elif agent == 'ocr_agent': return ocr_agent(query_text, memory) elif agent == 'scheduler': return scheduler(query_text, memory) elif agent == 'weather_information_agent': return weather_information_agent(query_text, memory) elif agent == 'currency_converter': return currency_converter(query_text, memory) elif agent == 'news_aggregator': return news_aggregator(query_text, memory) else: return {"message": f"Unknown agent: {agent}"} def research_paper_finder(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Find research papers related to: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def citation_generator(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Generate a citation for: {query_text}"} ], max_tokens=50 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def data_summarizer(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Summarize the following text: {query_text}"} ], max_tokens=1000 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def question_answering(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Answer the following question: {query_text}"} ], max_tokens=100 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def statistical_analysis(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Perform statistical analysis on the following data: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def graph_generator(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Generate a graph for the following data: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def keyword_extractor(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Extract keywords from the following text: {query_text}"} ], max_tokens=50 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def research_outline_generator(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Generate a research outline for: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def hypothesis_generator(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Generate a hypothesis based on the following topic: {query_text}"} ], max_tokens=100 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def methodology_advisor(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Suggest a methodology for the following research topic: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def experimental_design_helper(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Help design an experiment for: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def survey_designer(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Design a survey for: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def plagiarism_checker(query_text, memory): return {"message": "Plagiarism check is not implemented yet.", "query": query_text} def grammar_and_style_checker(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Check and correct the grammar and style of the following text: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def literature_review_organizer(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Organize the following literature review: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def data_cleaning_agent(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Clean the following data: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def bibliography_manager(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Manage the bibliography for: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def thesis_statement_generator(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Generate a thesis statement for: {query_text}"} ], max_tokens=100 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def funding_finder(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Find funding opportunities for: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def conference_finder(query_text, memory): embedding = embed_text(query_text) rag_results = query_qdrant(embedding) sources = [{"content": result.payload["content"], "id": result.id} for result in rag_results] response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Find conferences related to: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def web_scraper(query_text, memory): project_name = 'my_project' spider_name = 'my_spider' scrapyd_host = os.getenv('SCRAPYD_HOST', 'localhost') data = { 'project': project_name, 'spider': spider_name, 'start_urls': query_text } try: response = requests.post(f'http://{scrapyd_host}:6800/schedule.json', data=data) # if response.status_code == "200": job_id = response.json().get('jobid') # Wait for the job to finish and fetch the results time.sleep(15) # Adjust this sleep time as needed items_response = requests.get(f'http://{scrapyd_host}:6800/items/{project_name}/{spider_name}/{job_id}.jl') #if items_response.status_code == 200: items = [json.loads(line) for line in items_response.text.splitlines()] # for item in items: # Insert each scraped item into Qdrant content = items[0].get('content', '') embedding = embed_text(content) document_id = str(uuid.uuid4()) qdrant.upsert( collection_name='rag', points=[models.PointStruct(id=document_id, vector=embedding, payload={"content": content})] ) return {"message": content} # return {"message": f"Job completed with {len(items)} items scraped", "items": items} # else: # return {"message": "Failed to fetch scraped items"}, 500 #else: # return {"message": "Failed to schedule job"}, 500 except Exception as e: print(f"Error scheduling scrapy job: {e}") return {"message": f"Failed to schedule job - {e}"}, 500 def api_integrator(query_text, memory): response = requests.post( 'http://localhost:1880/api_integrator', json={'query': query_text} ) return {"message": response.json(), "query": query_text} def email_notifier(query_text, memory): msg = MIMEText(query_text) msg['Subject'] = 'Notification' msg['From'] = 'test@example.com' msg['To'] = 'mahesh.kommareddi@gmail.com' with smtplib.SMTP('mailhog', 1025) as server: server.sendmail(msg['From'], [msg['To']], msg.as_string()) return {"message": "Email sent successfully"} def file_converter(query_text, memory): response = requests.post( 'http://libreoffice:8084/convert', files={'file': query_text} ) return {"message": "File conversion completed", "data": response.json()} def translation_agent(query_text, memory): response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"The previous response relating to the query was: {memory}"}, {"role": "user", "content": f"Translate the following text: {query_text}"} ], max_tokens=150 ) response_message = response['choices'][0]['message']['content'].strip() return {"message": response_message, "sources": sources} def ocr_agent(query_text, memory): response = requests.post( 'http://localhost:8081/ocr', files={'file': query_text} ) return {"message": response.json(), "query": query_text} def scheduler(query_text, memory): configuration = Configuration( host="http://localhost:8082/api/v1" ) api_client = ApiClient(configuration) dag_run_api = DAGRunApi(api_client) dag_id = 'example_dag' dag_run = dag_run_api.post_dag_run( dag_id=dag_id, dag_run={"conf": {"query_text": query_text}} ) return {"message": f"Scheduled task for {query_text}", "dag_run_id": dag_run.dag_run_id} def weather_information_agent(query_text, memory): api_key = os.getenv('OPENWEATHERMAP_API_KEY') response = requests.get( f'http://api.openweathermap.org/data/2.5/weather?q={query_text}&appid={api_key}' ) return {"message": response.json(), "query": query_text} @app.route('/ocr', methods=['POST']) def handle_ocr(): if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 response = requests.post( 'http://localhost:8081/ocr', files={'file': file} ) return jsonify(response.json()) @app.route('/schedule', methods=['POST']) def handle_schedule(): data = request.json query_text = data['query'] return jsonify(scheduler(query_text)) @app.route('/weather', methods=['POST']) def handle_weather(): data = request.json query_text = data['query'] return jsonify(weather_information_agent(query_text)) @app.route('/scrape', methods=['POST']) def handle_scrape(): data = request.json query_text = data['query'] return web_scraper(query_text, {}) @app.route('/integrate', methods=['POST']) def handle_integrate(): data = request.json query_text = data['query'] return jsonify(api_integrator(query_text)) @app.route('/notify', methods=['POST']) def handle_notify(): data = request.json query_text = data['query'] return jsonify(email_notifier(query_text)) @app.route('/convert', methods=['POST']) def handle_convert(): if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 response = requests.post( 'http://localhost:8084/convert', files={'file': file} ) return jsonify(response.json()) @app.route('/') def serve_index(): return send_from_directory(app.static_folder, 'index.html') @app.route('/status/', methods=['GET']) def get_status(task_id): return jsonify(tasks_status.get(task_id, {"error": "Task ID not found"})) if __name__ == '__main__': app.run(host='0.0.0.0', port=1337)