core-crew/ecom.py

479 lines
17 KiB
Python
Raw Normal View History

from crewai import Process, Agent, Task, Crew, tasks
from crewai_tools import BaseTool
import sqlite3
import os
import langchain_core
# Set environment variables for OpenAI
os.environ["OPENAI_API_KEY"] = "sk-kkk"
os.environ["OPENAI_MODEL_NAME"] = "anthropic.claude-3-sonnet-20240229-v1:0"
os.environ["OPENAI_API_BASE"] = "http://chat.the.mk:1337"
# Initialize LLM
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="anthropic.claude-3-sonnet-20240229-v1:0",
base_url="http://chat.the.mk:1337",
temperature=0.1
)
# Create a new SQLite database file (if it doesn't exist)
conn = sqlite3.connect('your_database.db')
c = conn.cursor()
# Create tables for customers, orders, payments, inventory, and shipping
c.execute('''CREATE TABLE IF NOT EXISTS customers
(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, email TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS orders
(id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id INTEGER, items TEXT, total REAL)''')
c.execute('''CREATE TABLE IF NOT EXISTS payments
(id INTEGER PRIMARY KEY AUTOINCREMENT, order_id INTEGER, amount REAL, status TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS inventory
(id INTEGER PRIMARY KEY AUTOINCREMENT, product_id TEXT, quantity INTEGER)''')
c.execute('''CREATE TABLE IF NOT EXISTS shipping
(id INTEGER PRIMARY KEY AUTOINCREMENT, order_id INTEGER, shipping_address TEXT)''')
# Example usage
order_details = {
'customer_id': 1234,
'items': [
{'product_id': 'SKU010', 'quantity': 2},
{'product_id': 'SKU006', 'quantity': 1}
]
}
customer_inquiry = {
'customer_id': 5678,
'issue': 'Order status inquiry'
}
shipping_details = {
'order_id': 9876,
'shipping_address': '123 Main St, Anytown USA'
}
order = {
'order_details': order_details,
'customer_inquiry': customer_inquiry,
'shipping_details': shipping_details
}
def create_customer(customer_data):
name = customer_data.get("name")
email = customer_data.get("email")
c.execute("INSERT INTO customers (name, email) VALUES (?, ?)", (name, email))
conn.commit()
return c.lastrowid
def get_customer(customer_id):
c.execute("SELECT * FROM customers WHERE id = ?", (customer_id,))
customer = c.fetchone()
if customer:
return {"id": customer[0], "name": customer[1], "email": customer[2]}
else:
return None
def create_order(order_data):
customer_id = order_data.get("customer_id")
items = str(order_data.get("items"))
total = order_data.get("total")
c.execute("INSERT INTO orders (customer_id, items, total) VALUES (?, ?, ?)", (customer_id, items, total))
conn.commit()
return c.lastrowid
def get_order(order_id):
c.execute("SELECT * FROM orders WHERE id = ?", (order_id,))
order = c.fetchone()
if order:
return {"id": order[0], "customer_id": order[1], "items": eval(order[2]), "total": order[3]}
else:
return None
def process_payment(payment_data):
order_id = payment_data.get("order_id")
amount = payment_data.get("amount")
# Implement payment processing logic using a payment gateway
status = "paid" # Replace with actual payment status
c.execute("INSERT INTO payments (order_id, amount, status) VALUES (?, ?, ?)", (order_id, amount, status))
conn.commit()
return c.lastrowid
def get_payment(order_id):
c.execute("SELECT * FROM payments WHERE order_id = ?", (order_id,))
payment = c.fetchone()
if payment:
return {"id": payment[0], "order_id": payment[1], "amount": payment[2], "status": payment[3]}
else:
return None
def update_inventory(order_data):
for item in order_data.get("items", []):
product_id = item["product_id"]
quantity = item["quantity"]
c.execute("SELECT quantity FROM inventory WHERE product_id = ?", (product_id,))
result = c.fetchone()
if result:
current_quantity = result[0]
new_quantity = current_quantity - quantity
c.execute("UPDATE inventory SET quantity = ? WHERE product_id = ?", (new_quantity, product_id))
else:
c.execute("INSERT INTO inventory (product_id, quantity) VALUES (?, ?)", (product_id, -quantity))
conn.commit()
class CustomerManagementTool(BaseTool):
name: str | None = "Customer Management"
description: str | None = "A tool to manage customers."
def _run(self, **kwargs):
if "customer_data" in kwargs:
customer_data = kwargs["customer_data"]
customer_id = create_customer(customer_data)
return f"Customer created successfully with ID {customer_id}."
elif "customer_id" in kwargs:
customer_id = kwargs["customer_id"]
customer = get_customer(customer_id)
if customer:
return f"Customer details: {customer}"
else:
return f"Customer with ID {customer_id} not found."
else:
return "Invalid input. Please provide either 'customer_data' or 'customer_id'."
class OrderManagementTool(BaseTool):
name: str | None = "Order Management"
description: str | None = "A tool to manage orders."
def _run(self, **kwargs):
if "order_data" in kwargs:
order_data = kwargs["order_data"]
order_id = create_order(order_data)
update_inventory(order_data)
return f"Order created successfully with ID {order_id}."
elif "order_id" in kwargs:
order_id = kwargs["order_id"]
order = get_order(order_id)
if order:
return f"Order details: {order}"
else:
return f"Order with ID {order_id} not found."
else:
return "Invalid input. Please provide either 'order_data' or 'order_id'."
class PaymentProcessingTool(BaseTool):
name: str | None = "Payment Processing"
description: str | None = "A tool to process payments."
def _run(self, **kwargs):
if "payment_data" in kwargs:
payment_data = kwargs["payment_data"]
payment_id = process_payment(payment_data)
return f"Payment processed successfully with ID {payment_id}."
elif "order_id" in kwargs:
order_id = kwargs["order_id"]
payment = get_payment(order_id)
if payment:
return f"Payment details: {payment}"
else:
return f"Payment not found for order with ID {order_id}."
else:
return "Invalid input. Please provide either 'payment_data' or 'order_id'."
class InventoryManagementTool(BaseTool):
name: str | None = "Inventory Management"
description: str | None = "A tool to manage inventory."
def _run(self, **kwargs):
c.execute("SELECT * FROM inventory")
inventory_data = c.fetchall()
inventory_report = "Current inventory:\n"
for item in inventory_data:
product_id, quantity = item
inventory_report += f"{product_id}: {quantity}\n"
return inventory_report
class ShippingTool(BaseTool):
name: str | None = "Shipping"
description: str | None = "A tool to handle shipping and delivery."
def _run(self, **kwargs):
if "order_id" in kwargs:
order_id = kwargs["order_id"]
shipping = get_shipping(order_id)
if shipping:
return f"Shipping details: {shipping}"
else:
return f"Shipping details not found for order with ID {order_id}."
elif "shipping_data" in kwargs:
shipping_data = kwargs["shipping_data"]
order_id = shipping_data.get("order_id")
recipient_address = shipping_data.get("recipient_address")
package_details = shipping_data.get("package_details")
if not order_id or not recipient_address or not package_details:
return "Invalid shipping data. Please provide 'order_id', 'recipient_address', and 'package_details'."
shipping_id = create_shipping(order_id, recipient_address, package_details)
return f"Shipping details created with ID {shipping_id}."
else:
return "Invalid input. Please provide either 'order_id' or 'shipping_data'."
def create_shipping(order_id, recipient_address, package_details):
name = recipient_address.get("name")
street = recipient_address.get("street")
city = recipient_address.get("city")
state = recipient_address.get("state")
zip_code = recipient_address.get("zip")
weight = package_details.get("weight")
length = package_details.get("dimensions", {}).get("length")
width = package_details.get("dimensions", {}).get("width")
height = package_details.get("dimensions", {}).get("height")
shipping_address = f"{name}, {street}, {city}, {state} {zip_code}"
c.execute("INSERT INTO shipping (order_id, shipping_address, weight, length, width, height) VALUES (?, ?, ?, ?, ?, ?)",
(order_id, shipping_address, weight, length, width, height))
conn.commit()
return c.lastrowid
def get_shipping(order_id):
c.execute("SELECT * FROM shipping WHERE order_id = ?", (order_id,))
shipping = c.fetchone()
if shipping:
return {
"id": shipping[0],
"order_id": shipping[1],
"shipping_address": shipping[2],
"weight": shipping[3],
"length": shipping[4],
"width": shipping[5],
"height": shipping[6]
}
else:
return None
# Create instances of the custom tools
customer_management_tool = CustomerManagementTool()
order_management_tool = OrderManagementTool()
payment_processing_tool = PaymentProcessingTool()
inventory_management_tool = InventoryManagementTool()
shipping_tool = ShippingTool()
# ... (rest of the code remains the same)
# Callback functions
def order_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Order Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
def customer_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Customer Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
def inventory_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Inventory Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
def logistics_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Logistics Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
def payment_callback(output):
if isinstance(output, langchain_core.agents.AgentFinish):
print("Logistics Agent finished")
elif output and output[0]:
print(output[0])
if output[0][0]:
print(output[0][0].log)
# Define the Order Agent
order_agent = Agent(
role='Order Manager',
goal='Manage and process customer orders efficiently',
backstory=(
"You are the Order Manager at an e-commerce company."
"Your responsibilities include receiving and processing customer orders,"
"ensuring that orders are accurate and complete, and coordinating with other teams to fulfill orders."
"You have access to the company's order management system."
),
tools=[order_management_tool],
max_rpm=100,
step_callback=order_callback
)
order_task = Task(
description=(
"Process a new customer order."
"Ensure the order details are accurate and complete."
"Coordinate with other teams to fulfill the order."
),
expected_output="Customer order processed and coordinated for fulfillment.",
agent=order_agent
)
# Define the Customer Agent
customer_agent = Agent(
role='Customer Support',
goal='Provide excellent customer service and resolve inquiries quickly',
backstory=(
"You are a Customer Support Agent at an e-commerce company."
"Your role is to assist customers with their inquiries, provide accurate information,"
"and resolve any issues they may have."
"You have access to the company's customer support system, knowledge base, order information, and issue tracking systems."
),
tools=[customer_management_tool],
max_rpm=100,
step_callback=customer_callback
)
customer_task = Task(
description=(
"Handle the incoming customer inquiry."
"Provide accurate information and resolve the issue."
"Document the interaction in the customer support system."
),
expected_output="Customer inquiry resolved, information provided, and interaction documented.",
agent=customer_agent
)
# Define the Payment Agent
payment_agent = Agent(
role='Payment Processor',
goal='Process customer payments securely',
backstory=(
"You are the Payment Processor at an e-commerce company."
"Your role is to process customer payments using a secure payment gateway."
"You have access to the payment processing system."
),
tools=[payment_processing_tool],
max_rpm=100,
step_callback=payment_callback
)
payment_task = Task(
description=(
"Process the payment for a customer order."
"Ensure the payment is processed securely and accurately."
"Update the order status and notify the customer."
),
expected_output="Payment processed successfully, order status updated, and customer notified.",
agent=payment_agent
)
# Define the Inventory Agent
inventory_agent = Agent(
role='Inventory Manager',
goal='Maintain accurate inventory levels and ensure product availability',
backstory=(
"You are the Inventory Manager at an e-commerce company."
"Your role is to monitor and manage inventory levels, reorder products when needed,"
"and ensure that the warehouse is well-stocked."
"You have access to the company's inventory management system."
),
tools=[inventory_management_tool],
max_rpm=100,
step_callback=inventory_callback
)
inventory_task = Task(
description=(
"Monitor inventory levels and reorder products as needed."
"Coordinate with suppliers for timely restocking."
"Optimize stock levels based on demand and sales data."
),
expected_output="Inventory levels optimized, and restocking coordinated with suppliers.",
agent=inventory_agent
)
# Define the Logistics Agent
logistics_agent = Agent(
role='Logistics Coordinator',
goal='Ensure timely and accurate shipping of customer orders',
backstory=(
"You are the Logistics Coordinator at an e-commerce company."
"Your role is to manage the shipping process, coordinate with carriers,"
"and track shipments to ensure timely delivery."
"You have access to the company's order management system and carrier integrations."
),
tools=[shipping_tool],
max_rpm=100,
step_callback=logistics_callback
)
logistics_task = Task(
description=(
"Coordinate the shipping of the processed order."
"Select the appropriate carrier and shipping method."
"Track the shipment and provide updates to the customer."
),
expected_output="Order shipped successfully, and tracking information provided to the customer.",
agent=logistics_agent
)
class ManagerTool(BaseTool):
name: str | None = "Manager Tool"
description: str | None = "A tool for the Manager Agent to coordinate other agents and handle business processes."
# name = "Manager Tool"
# description = "A tool for the Manager Agent to coordinate other agents and handle business processes."
def _run(self, **kwargs) -> str:
return f"I understand. I have done what is necessary."
manager_agent = Agent(
role="Business Operations Manager",
goal="Coordinate and manage business processes efficiently",
backstory=(
"You are the Business Operations Manager at an e-commerce company."
"Your role is to oversee and coordinate various business processes,"
"including order processing, customer support, inventory management, and logistics."
"You have access to a tool that allows you to delegate tasks to specialized agents."
"The agents you can delegate to are order_agent, customer_agent, inventory_agent, and logistics_agent"
),
delegate=True,
tools=[ManagerTool()],
max_rpm=100,
)
manager_task = Task(
description="Manage and coordinate the appropriate business processes based on the given input.",
expected_output="Business processes executed successfully.",
agent=manager_agent,
)
crew = Crew(
name="eCommerce Team",
agents=[
manager_agent,
order_agent,
customer_agent,
payment_agent,
inventory_agent,
logistics_agent
],
tasks=[manager_task, order_task, customer_task, payment_task, inventory_task, logistics_task],
manager_llm=ChatOpenAI(
model="amazon-embeddings",
base_url="http://chat.the.mk:1337",
temperature=0.5
), # Mandatory for hierarchical process
process=Process.hierarchical, # Specifies the hierarchical management approach
)
crew.kickoff()