Long-Running AOP Execution

This guide shows how to execute Agent Operating Procedures (AOPs) using the Python SDK with asynchronous execution and proper polling for completion. This is the recommended approach for production applications where AOPs may take minutes or longer to complete.

Why Async Execution? When you call execute_async(), it returns immediately with a thread_id. The AOP continues running in the background. You must poll threads.get_status() in a loop to know when execution finishes. Without polling, you have no way to know if the AOP completed, failed, or is still running.

Key features:

  • Non-blocking execution - execute_async() returns immediately with a thread ID
  • Polling-based completion tracking - Use threads.get_status() in a loop to wait for results
  • Configurable timeouts - Set appropriate timeouts for your workflow complexity
  • Production-ready patterns - Error handling, retries, and batch processing

execute_async() returns immediately with a thread_id and does not wait for completion. You must implement a polling loop using threads.get_status() to wait for the AOP to finish.

1

Install Package

1!pip install -U athena-intelligence
2

Set Up Client

1import os
2import time
3from athena.client import Athena
4
5client = Athena(api_key=os.environ["ATHENA_API_KEY"])
3

Start Async Execution

Call execute_async() to start the AOP. This returns immediately with a thread_id you use to track progress.

1response = client.aop.execute_async(
2 asset_id="asset_xxx",
3 user_inputs={"company": "Acme Corp"}
4)
5
6thread_id = response.thread_id
7print(f"Started AOP: {response.aop_title}")
8print(f"Thread ID: {thread_id}")
9print(f"Status: {response.status}")

At this point the AOP is running in the background. The response.status will indicate it has started, but the work is not done yet.

4

Poll for Completion

You must check threads.get_status() in a loop until the status is completed or failed.

1timeout = 3600
2poll_interval = 5
3start_time = time.time()
4
5while True:
6 if time.time() - start_time > timeout:
7 raise TimeoutError(f"AOP execution timed out after {timeout}s")
8
9 status = client.threads.get_status(thread_id=thread_id)
10
11 if status.status == "completed":
12 print("AOP completed successfully!")
13 break
14 elif status.status == "failed":
15 print(f"AOP execution failed")
16 break
17
18 time.sleep(poll_interval)
19
20print(f"Final status: {status.status}")
6

Production-Ready Helper Function

Wrap the full pattern into a reusable function:

1import time
2from athena.client import Athena
3from athena.core import ApiError
4
5
6def execute_aop_with_polling(
7 client: Athena,
8 aop_asset_id: str,
9 user_inputs: dict,
10 timeout_seconds: int = 3600,
11 poll_interval: int = 5
12):
13 """Execute an AOP and poll until completion."""
14 response = client.aop.execute_async(
15 asset_id=aop_asset_id,
16 user_inputs=user_inputs
17 )
18
19 thread_id = response.thread_id
20 start_time = time.time()
21
22 while True:
23 elapsed = time.time() - start_time
24
25 if elapsed > timeout_seconds:
26 raise TimeoutError(
27 f"AOP execution timed out after {elapsed:.1f}s"
28 )
29
30 status_response = client.threads.get_status(
31 thread_id=thread_id
32 )
33
34 if status_response.status == "completed":
35 return status_response
36 elif status_response.status == "failed":
37 raise RuntimeError(
38 f"AOP execution failed for thread {thread_id}"
39 )
40
41 time.sleep(poll_interval)

Usage:

1client = Athena(api_key=os.environ["ATHENA_API_KEY"])
2
3result = execute_aop_with_polling(
4 client=client,
5 aop_asset_id="asset_xxx",
6 user_inputs={"company": "Acme Corp"},
7 timeout_seconds=1800,
8 poll_interval=5
9)
10
11print(f"Completed: {result.status}")
7

Batch Processing

Process multiple AOPs sequentially with proper polling for each:

1def execute_aops_sequentially(
2 client: Athena,
3 aop_configs: list[dict],
4 timeout_seconds: int = 3600,
5 poll_interval: int = 5
6) -> list[dict]:
7 """Execute multiple AOPs one after another, polling each to completion."""
8 results = []
9
10 for i, config in enumerate(aop_configs):
11 print(f"[{i + 1}/{len(aop_configs)}] Starting: {config['asset_id']}")
12
13 try:
14 result = execute_aop_with_polling(
15 client=client,
16 aop_asset_id=config["asset_id"],
17 user_inputs=config.get("user_inputs", {}),
18 timeout_seconds=timeout_seconds,
19 poll_interval=poll_interval
20 )
21 results.append({
22 "asset_id": config["asset_id"],
23 "status": "completed",
24 "result": result
25 })
26 except (TimeoutError, RuntimeError) as e:
27 results.append({
28 "asset_id": config["asset_id"],
29 "status": "failed",
30 "error": str(e)
31 })
32
33 return results
34
35
36# Usage
37aop_configs = [
38 {"asset_id": "asset_research_aop", "user_inputs": {"company": "Acme Corp"}},
39 {"asset_id": "asset_analysis_aop", "user_inputs": {"quarter": "Q1 2024"}},
40 {"asset_id": "asset_report_aop", "user_inputs": {"format": "summary"}},
41]
42
43results = execute_aops_sequentially(client, aop_configs)
44
45for r in results:
46 print(f"{r['asset_id']}: {r['status']}")
8

Error Handling

Handle the full range of errors that can occur during async execution:

1import time
2from athena.client import Athena
3from athena.core import ApiError
4
5
6def execute_aop_robust(
7 client: Athena,
8 aop_asset_id: str,
9 user_inputs: dict,
10 timeout_seconds: int = 3600,
11 poll_interval: int = 5,
12 max_poll_errors: int = 3
13):
14 """Execute an AOP with comprehensive error handling."""
15 # Start execution
16 try:
17 response = client.aop.execute_async(
18 asset_id=aop_asset_id,
19 user_inputs=user_inputs
20 )
21 except ApiError as e:
22 raise RuntimeError(
23 f"Failed to start AOP: {e.status_code} - {e.body}"
24 )
25
26 thread_id = response.thread_id
27 start_time = time.time()
28 consecutive_errors = 0
29
30 while True:
31 elapsed = time.time() - start_time
32
33 if elapsed > timeout_seconds:
34 raise TimeoutError(
35 f"AOP timed out after {elapsed:.1f}s. "
36 f"Thread {thread_id} may still be running."
37 )
38
39 try:
40 status = client.threads.get_status(thread_id=thread_id)
41 consecutive_errors = 0
42 except ApiError:
43 consecutive_errors += 1
44 if consecutive_errors >= max_poll_errors:
45 raise RuntimeError(
46 f"Lost connection: {consecutive_errors} consecutive "
47 f"polling failures for thread {thread_id}"
48 )
49 time.sleep(poll_interval)
50 continue
51
52 if status.status == "completed":
53 return status
54 elif status.status == "failed":
55 raise RuntimeError(
56 f"AOP execution failed for thread {thread_id}"
57 )
58 elif status.status == "waiting-for-approval":
59 print(
60 f"Thread {thread_id} is waiting for approval. "
61 f"Approve in the Athena UI to continue."
62 )
63
64 time.sleep(poll_interval)

Handled error scenarios:

ErrorCauseHandling
TimeoutErrorAOP takes longer than timeout_secondsRaise with thread ID so you can check later
ApiError on startInvalid asset ID, auth failure, bad inputsRaise immediately, do not poll
ApiError during pollTransient network issueRetry up to max_poll_errors times, then raise
failed statusAOP execution errorRaise RuntimeError with thread ID
waiting-for-approvalAOP requires human approval stepLog a message, keep polling
9

Common Mistakes

Single status check (wrong):

1# This only checks ONCE and does not wait for completion
2response = client.aop.execute_async(
3 asset_id="asset_xxx",
4 user_inputs={"company": "Acme Corp"}
5)
6status = client.threads.get_status(thread_id=response.thread_id)
7# BUG: AOP is still running, status is NOT "completed" yet

Polling loop (correct):

1# This properly waits for completion
2response = client.aop.execute_async(
3 asset_id="asset_xxx",
4 user_inputs={"company": "Acme Corp"}
5)
6while True:
7 status = client.threads.get_status(thread_id=response.thread_id)
8 if status.status in ["completed", "failed"]:
9 break
10 time.sleep(5)

The difference: execute_async() starts execution and returns immediately. If you only check status once, the AOP is almost certainly still running. You need the while True loop to keep checking until a terminal status is reached.