A2A 伺服器
A2A 伺服器讓您能夠透過標準化的 A2A (Agent-to-Agent) 協定公開 AI 代理程式。它提供了 A2A 協定規範的完整實作,負責處理用戶端請求、執行代理程式邏輯、管理複雜的任務生命週期,並支援即時串流回應。
相依性
要在您的專案中使用 A2A 伺服器,請將以下相依性新增至 build.gradle.kts:
dependencies {
// 核心 A2A 伺服器函式庫
implementation("ai.koog:a2a-server:$koogVersion")
// HTTP JSON-RPC 傳輸 (最常見)
implementation("ai.koog:a2a-transport-server-jsonrpc-http:$koogVersion")
// Ktor 伺服器引擎 (選擇符合您需求的)
implementation("io.ktor:ktor-server-netty:$ktorVersion")
}總覽
A2A 伺服器作為 A2A 協定傳輸層與您的自訂代理程式邏輯之間的橋樑。它協調整個請求生命週期,同時保持協定合規性並提供強大的會話管理。
核心元件
A2AServer
實作完整 A2A 協定的主要伺服器類別。它作為中央協調器,負責:
- 驗證傳入請求是否符合協定規範
- 管理並行會話和任務生命週期
- 協調傳輸、儲存和業務邏輯層之間的通訊
- 處理所有協定操作:訊息傳送、任務查詢、取消、推播通知
A2AServer 接受兩個必要參數:
AgentExecutor,定義代理程式的業務邏輯實作AgentCard,定義代理程式功能和中繼資料
以及許多可用於自訂其儲存和傳輸行為的可選參數。
AgentExecutor
AgentExecutor 介面是您實作代理程式核心業務邏輯的地方。它作為 A2A 協定與您的特定 AI 代理程式功能之間的橋樑。要開始執行您的代理程式,您必須實作 execute 方法來定義代理程式的邏輯。要取消代理程式,您必須實作 cancel 方法。
class MyAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
// 代理程式邏輯在此
}
override suspend fun cancel(
context: RequestContext<TaskIdParams>,
eventProcessor: SessionEventProcessor,
agentJob: Deferred<Unit>?
) {
// 在此取消代理程式,可選
}
}RequestContext 提供關於目前請求的豐富資訊,包括目前會話的 contextId 和 taskId、傳送的 message 以及請求的 params。
SessionEventProcessor 與用戶端通訊:
sendMessage(message):傳送即時回應(聊天風格互動)sendTaskEvent(event):傳送任務相關更新(長時間執行的操作)
// 用於即時回應(例如聊天機器人)
eventProcessor.sendMessage(
Message(
messageId = generateId(),
role = Role.Agent,
parts = listOf(TextPart("Here's your answer!")),
contextId = context.contextId
)
)
// 用於任務型操作
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Working,
message = Message(/* progress update */), // 進度更新
timestamp = Clock.System.now()
),
final = false // 還有更多更新會到來
)
)AgentCard
AgentCard 作為您的代理程式的自述清單。它告訴用戶端您的代理程式能做什麼、如何與其通訊以及它有哪些安全性要求。
val agentCard = AgentCard(
// 基本身份
name = "Advanced Recipe Assistant",
description = "AI agent specialized in cooking advice, recipe generation, and meal planning",
version = "2.1.0",
protocolVersion = "0.3.0",
// 通訊設定
url = "https://api.example.com/a2a",
preferredTransport = TransportProtocol.JSONRPC,
// 可選:多重傳輸支援
additionalInterfaces = listOf(
AgentInterface("https://api.example.com/a2a", TransportProtocol.JSONRPC),
),
// 功能宣告
capabilities = AgentCapabilities(
streaming = true, // 支援即時回應
pushNotifications = true, // 傳送非同步通知
stateTransitionHistory = true // 維護任務歷史記錄
),
// 內容類型支援
defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),
// 定義可用的安全方案
securitySchemes = mapOf(
"bearer" to HTTPAuthSecurityScheme(
scheme = "Bearer",
bearerFormat = "JWT",
description = "JWT token authentication"
),
"api-key" to APIKeySecurityScheme(
`in` = In.Header,
name = "X-API-Key",
description = "API key for service authentication"
)
),
// 指定安全要求(要求的邏輯 OR)
security = listOf(
mapOf("bearer" to listOf("read", "write")), // 選項 1:具有讀寫範圍的 JWT
mapOf("api-key" to emptyList()) // 選項 2:API 金鑰
),
// 啟用已驗證用戶的擴展卡
supportsAuthenticatedExtendedCard = true,
// 技能/功能
skills = listOf(
AgentSkill(
id = "recipe-generation",
name = "Recipe Generation",
description = "Generate custom recipes based on ingredients, dietary restrictions, and preferences",
tags = listOf("cooking", "recipes", "nutrition"),
examples = listOf(
"Create a vegan pasta recipe with mushrooms",
"I have chicken, rice, and vegetables. What can I make?"
)
),
AgentSkill(
id = "meal-planning",
name = "Meal Planning",
description = "Plan weekly meals and generate shopping lists",
tags = listOf("meal-planning", "nutrition", "shopping")
)
),
// 可選:品牌
iconUrl = "https://example.com/agent-icon.png",
documentationUrl = "https://docs.example.com/recipe-agent",
provider = AgentProvider(
organization = "CookingAI Inc.",
url = "https://cookingai.com"
)
)傳輸層
A2A 本身支援多種傳輸協定,用於與用戶端通訊。目前,Koog 提供透過 HTTP 的 JSON-RPC 伺服器傳輸實作。
HTTP JSON-RPC 傳輸
val transport = HttpJSONRPCServerTransport(server)
transport.start(
engineFactory = CIO, // Ktor 引擎 (CIO, Netty, Jetty)
port = 8080, // 伺服器連接埠
path = "/a2a", // API 端點路徑
wait = true // 阻擋直到伺服器停止
)儲存
A2A 伺服器使用可插拔的儲存架構,分離不同類型的資料。所有儲存實作都是可選的,並預設為開發用途的記憶體內變體。
- TaskStorage:任務生命週期管理 – 儲存和管理任務狀態、歷史記錄和產物
- MessageStorage:會話歷史記錄 – 管理會話上下文中的訊息歷史記錄
- PushNotificationConfigStorage:Webhook 管理 – 管理用於非同步通知的 Webhook 配置
快速入門
1. 建立 AgentCard
定義您的代理程式的功能和中繼資料。
val agentCard = AgentCard(
name = "IO Assistant",
description = "AI agent specialized in input modification",
version = "2.1.0",
protocolVersion = "0.3.0",
// 通訊設定
url = "https://api.example.com/a2a",
preferredTransport = TransportProtocol.JSONRPC,
// 功能宣告
capabilities =
AgentCapabilities(
streaming = true, // 支援即時回應
pushNotifications = true, // 傳送非同步通知
stateTransitionHistory = true // 維護任務歷史記錄
),
// 內容類型支援
defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),
// 技能/功能
skills = listOf(
AgentSkill(
id = "echo",
name = "echo",
description = "Echoes back user messages",
tags = listOf("io"),
)
)
)2. 建立 AgentExecutor
執行器負責實作代理程式邏輯,處理傳入請求並傳送回應。
class EchoAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val userMessage = context.params.message
val userText = userMessage.parts
.filterIsInstance<TextPart>()
.joinToString(" ") { it.text }
// 將用戶訊息回應回去
val response = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("You said: $userText")),
contextId = context.contextId,
taskId = context.taskId
)
eventProcessor.sendMessage(response)
}
}2. 建立伺服器
將代理程式執行器和代理程式卡傳遞給伺服器。
val server = A2AServer(
agentExecutor = EchoAgentExecutor(),
agentCard = agentCard
)3. 加入傳輸層
建立一個傳輸層並啟動伺服器。
// HTTP JSON-RPC 傳輸
val transport = HttpJSONRPCServerTransport(server)
transport.start(
engineFactory = CIO,
port = 8080,
path = "/agent",
wait = true
)代理程式實作模式
簡單回應代理程式
如果您的代理程式只需要回應單一訊息,您可以將其實作成一個簡單的代理程式。如果代理程式的執行邏輯不複雜且不耗時,也可以使用這種方式。
class SimpleAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val response = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("Hello from agent!")),
contextId = context.contextId,
taskId = context.taskId
)
eventProcessor.sendMessage(response)
}
}任務型代理程式
如果您的代理程式執行邏輯複雜且需要多個步驟,您可以將其實作成一個任務型代理程式。如果代理程式執行邏輯耗時且需要暫停(suspending),也可以使用這種方式。
class TaskAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
// 傳送工作狀態
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Working,
timestamp = Clock.System.now()
),
final = false
)
)
// 執行工作...
// 傳送完成狀態
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Completed,
timestamp = Clock.System.now()
),
final = true
)
)
}
}