68 lines
2.5 KiB
Python
68 lines
2.5 KiB
Python
import argparse
|
|
import os
|
|
import sys
|
|
|
|
from dotenv import load_dotenv
|
|
from google import genai
|
|
from google.genai import types
|
|
from prompts import system_prompt
|
|
from functions.call_function import available_functions, call_function
|
|
|
|
load_dotenv()
|
|
api_key = os.environ.get("GEMINI_API_KEY")
|
|
if not api_key:
|
|
raise RuntimeError("API Key not found")
|
|
|
|
client = genai.Client(api_key=api_key)
|
|
parser = argparse.ArgumentParser(description="Chatbot")
|
|
parser.add_argument("user_prompt", type=str, help="User prompt")
|
|
parser.add_argument("--verbose", action="store_true", help="Enable verbose output")
|
|
args = parser.parse_args()
|
|
messages = [types.Content(role="user", parts=[types.Part(text=args.user_prompt)])]
|
|
|
|
for _ in range(10):
|
|
response = client.models.generate_content(
|
|
model="gemini-2.5-flash",
|
|
contents=messages,
|
|
config=types.GenerateContentConfig(
|
|
system_instruction=system_prompt,
|
|
tools=[available_functions],
|
|
),
|
|
)
|
|
if response.candidates:
|
|
for candidate in response.candidates:
|
|
if candidate.content:
|
|
messages.append(candidate.content)
|
|
|
|
if not response.usage_metadata:
|
|
raise RuntimeError("Cannot get usage metadata")
|
|
|
|
if args.verbose:
|
|
print(f"User prompt: {args.user_prompt}")
|
|
print(f"Prompt tokens: {response.usage_metadata.prompt_token_count}")
|
|
print(f"Response tokens: {response.usage_metadata.candidates_token_count}")
|
|
|
|
function_calls = response.function_calls
|
|
function_responses = []
|
|
if function_calls:
|
|
for function_call in function_calls:
|
|
function_call_result = call_function(function_call, args.verbose)
|
|
|
|
if not function_call_result.parts:
|
|
raise Exception("function_call_result.parts list is empty")
|
|
if function_call_result.parts[0].function_response is None:
|
|
raise Exception("FunctionResponse object is equal to None")
|
|
if function_call_result.parts[0].function_response.response is None:
|
|
raise Exception("FunctionResponse object response is equal to None")
|
|
|
|
function_responses.append(function_call_result.parts[0])
|
|
if args.verbose:
|
|
print(f"-> {function_call_result.parts[0].function_response.response}")
|
|
|
|
messages.append(types.Content(role="user", parts=function_responses))
|
|
else:
|
|
break
|
|
else:
|
|
print("Maximum iterations reached without a final response")
|
|
sys.exit(1)
|