From 4cda5f0e75a48d5a76bf3d7c3edb6dc720eff700 Mon Sep 17 00:00:00 2001 From: xtarzyk Date: Mon, 30 Mar 2026 20:52:19 +0200 Subject: [PATCH] agent loop is ready --- functions/call_function.py | 48 ++++++++++++++++++++++++--- main.py | 66 ++++++++++++++++++++++++++------------ 2 files changed, 88 insertions(+), 26 deletions(-) diff --git a/functions/call_function.py b/functions/call_function.py index 97cce19..a6675d3 100644 --- a/functions/call_function.py +++ b/functions/call_function.py @@ -1,8 +1,8 @@ from google.genai import types -from functions.get_file_content import schema_get_file_content -from functions.get_files_info import schema_get_files_info -from functions.run_python_file import schema_run_python_file -from functions.write_file import schema_write_file +from functions.get_file_content import schema_get_file_content, get_file_content +from functions.get_files_info import schema_get_files_info, get_files_info +from functions.run_python_file import schema_run_python_file, run_python_file +from functions.write_file import schema_write_file, write_file available_functions = types.Tool( function_declarations=[ @@ -11,4 +11,42 @@ available_functions = types.Tool( schema_write_file, schema_run_python_file ], -) \ No newline at end of file +) + +def call_function(function_call, verbose=False): + if verbose: + print(f"Calling function: {function_call.name}({function_call.args})") + print(f" - Calling function: {function_call.name}") + + function_map = { + "get_file_content": get_file_content, + "get_files_info": get_files_info, + "run_python_file": run_python_file, + "write_file": write_file, + } + + function_name = function_call.name or "" + if function_name == "": + return types.Content( + role="tool", + parts=[ + types.Part.from_function_response( + name=function_name, + response={"error": f"Unknown function: {function_name}"}, + ) + ], + ) + + args = dict(function_call.args) if function_call.args else {} + args["working_directory"] = "./calculator" + function_result = function_map[function_name](**args) + return types.Content( + role="tool", + parts=[ + types.Part.from_function_response( + name=function_name, + response={"result": function_result}, + ) + ], + ) + \ No newline at end of file diff --git a/main.py b/main.py index d90ef10..b8c223c 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,12 @@ import argparse import os +import sys from dotenv import load_dotenv from google import genai from google.genai import types from prompts import system_prompt -from functions.call_function import available_functions +from functions.call_function import available_functions, call_function load_dotenv() api_key = os.environ.get("GEMINI_API_KEY") @@ -19,25 +20,48 @@ parser.add_argument("--verbose", action="store_true", help="Enable verbose outpu args = parser.parse_args() messages = [types.Content(role="user", parts=[types.Part(text=args.user_prompt)])] -response = client.models.generate_content( - model="gemini-2.5-flash", - contents=messages, - config=types.GenerateContentConfig( - system_instruction=system_prompt, - tools=[available_functions], - ), -) -if not response.usage_metadata: - raise RuntimeError("Cannot get usage metadata") +for _ in range(10): + response = client.models.generate_content( + model="gemini-2.5-flash", + contents=messages, + config=types.GenerateContentConfig( + system_instruction=system_prompt, + tools=[available_functions], + ), + ) + if response.candidates: + for candidate in response.candidates: + if candidate.content: + messages.append(candidate.content) -if args.verbose: - print(f"User prompt: {args.user_prompt}") - print(f"Prompt tokens: {response.usage_metadata.prompt_token_count}") - print(f"Response tokens: {response.usage_metadata.candidates_token_count}") - -function_calls = response.function_calls -if function_calls: - for function_call in function_calls: - print(f"Calling function: {function_call.name}({function_call.args})") + if not response.usage_metadata: + raise RuntimeError("Cannot get usage metadata") + + if args.verbose: + print(f"User prompt: {args.user_prompt}") + print(f"Prompt tokens: {response.usage_metadata.prompt_token_count}") + print(f"Response tokens: {response.usage_metadata.candidates_token_count}") + + function_calls = response.function_calls + function_responses = [] + if function_calls: + for function_call in function_calls: + function_call_result = call_function(function_call, args.verbose) + + if not function_call_result.parts: + raise Exception("function_call_result.parts list is empty") + if function_call_result.parts[0].function_response is None: + raise Exception("FunctionResponse object is equal to None") + if function_call_result.parts[0].function_response.response is None: + raise Exception("FunctionResponse object response is equal to None") + + function_responses.append(function_call_result.parts[0]) + if args.verbose: + print(f"-> {function_call_result.parts[0].function_response.response}") + + messages.append(types.Content(role="user", parts=function_responses)) + else: + break else: - print(response.text) + print("Maximum iterations reached without a final response") + sys.exit(1)