agent loop is ready
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
from google.genai import types
|
||||
from functions.get_file_content import schema_get_file_content
|
||||
from functions.get_files_info import schema_get_files_info
|
||||
from functions.run_python_file import schema_run_python_file
|
||||
from functions.write_file import schema_write_file
|
||||
from functions.get_file_content import schema_get_file_content, get_file_content
|
||||
from functions.get_files_info import schema_get_files_info, get_files_info
|
||||
from functions.run_python_file import schema_run_python_file, run_python_file
|
||||
from functions.write_file import schema_write_file, write_file
|
||||
|
||||
available_functions = types.Tool(
|
||||
function_declarations=[
|
||||
@@ -12,3 +12,41 @@ available_functions = types.Tool(
|
||||
schema_run_python_file
|
||||
],
|
||||
)
|
||||
|
||||
def call_function(function_call, verbose=False):
|
||||
if verbose:
|
||||
print(f"Calling function: {function_call.name}({function_call.args})")
|
||||
print(f" - Calling function: {function_call.name}")
|
||||
|
||||
function_map = {
|
||||
"get_file_content": get_file_content,
|
||||
"get_files_info": get_files_info,
|
||||
"run_python_file": run_python_file,
|
||||
"write_file": write_file,
|
||||
}
|
||||
|
||||
function_name = function_call.name or ""
|
||||
if function_name == "":
|
||||
return types.Content(
|
||||
role="tool",
|
||||
parts=[
|
||||
types.Part.from_function_response(
|
||||
name=function_name,
|
||||
response={"error": f"Unknown function: {function_name}"},
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
args = dict(function_call.args) if function_call.args else {}
|
||||
args["working_directory"] = "./calculator"
|
||||
function_result = function_map[function_name](**args)
|
||||
return types.Content(
|
||||
role="tool",
|
||||
parts=[
|
||||
types.Part.from_function_response(
|
||||
name=function_name,
|
||||
response={"result": function_result},
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from google import genai
|
||||
from google.genai import types
|
||||
from prompts import system_prompt
|
||||
from functions.call_function import available_functions
|
||||
from functions.call_function import available_functions, call_function
|
||||
|
||||
load_dotenv()
|
||||
api_key = os.environ.get("GEMINI_API_KEY")
|
||||
@@ -19,25 +20,48 @@ parser.add_argument("--verbose", action="store_true", help="Enable verbose outpu
|
||||
args = parser.parse_args()
|
||||
messages = [types.Content(role="user", parts=[types.Part(text=args.user_prompt)])]
|
||||
|
||||
response = client.models.generate_content(
|
||||
model="gemini-2.5-flash",
|
||||
contents=messages,
|
||||
config=types.GenerateContentConfig(
|
||||
system_instruction=system_prompt,
|
||||
tools=[available_functions],
|
||||
),
|
||||
)
|
||||
if not response.usage_metadata:
|
||||
raise RuntimeError("Cannot get usage metadata")
|
||||
for _ in range(10):
|
||||
response = client.models.generate_content(
|
||||
model="gemini-2.5-flash",
|
||||
contents=messages,
|
||||
config=types.GenerateContentConfig(
|
||||
system_instruction=system_prompt,
|
||||
tools=[available_functions],
|
||||
),
|
||||
)
|
||||
if response.candidates:
|
||||
for candidate in response.candidates:
|
||||
if candidate.content:
|
||||
messages.append(candidate.content)
|
||||
|
||||
if args.verbose:
|
||||
print(f"User prompt: {args.user_prompt}")
|
||||
print(f"Prompt tokens: {response.usage_metadata.prompt_token_count}")
|
||||
print(f"Response tokens: {response.usage_metadata.candidates_token_count}")
|
||||
if not response.usage_metadata:
|
||||
raise RuntimeError("Cannot get usage metadata")
|
||||
|
||||
function_calls = response.function_calls
|
||||
if function_calls:
|
||||
for function_call in function_calls:
|
||||
print(f"Calling function: {function_call.name}({function_call.args})")
|
||||
if args.verbose:
|
||||
print(f"User prompt: {args.user_prompt}")
|
||||
print(f"Prompt tokens: {response.usage_metadata.prompt_token_count}")
|
||||
print(f"Response tokens: {response.usage_metadata.candidates_token_count}")
|
||||
|
||||
function_calls = response.function_calls
|
||||
function_responses = []
|
||||
if function_calls:
|
||||
for function_call in function_calls:
|
||||
function_call_result = call_function(function_call, args.verbose)
|
||||
|
||||
if not function_call_result.parts:
|
||||
raise Exception("function_call_result.parts list is empty")
|
||||
if function_call_result.parts[0].function_response is None:
|
||||
raise Exception("FunctionResponse object is equal to None")
|
||||
if function_call_result.parts[0].function_response.response is None:
|
||||
raise Exception("FunctionResponse object response is equal to None")
|
||||
|
||||
function_responses.append(function_call_result.parts[0])
|
||||
if args.verbose:
|
||||
print(f"-> {function_call_result.parts[0].function_response.response}")
|
||||
|
||||
messages.append(types.Content(role="user", parts=function_responses))
|
||||
else:
|
||||
break
|
||||
else:
|
||||
print(response.text)
|
||||
print("Maximum iterations reached without a final response")
|
||||
sys.exit(1)
|
||||
|
||||
Reference in New Issue
Block a user