#!/usr/bin/env python3
"""
OpenConvert Client
Client class for connecting to the OpenConvert OpenAgents network,
discovering conversion agents, and performing file conversions.
"""
import asyncio
import logging
import base64
import tempfile
import uuid
from pathlib import Path
from typing import Optional, List, Dict, Any
# Import OpenAgents modules
import sys
import os
# Add openagents src to path
current_dir = Path(__file__).resolve().parent
openagents_root = current_dir.parent.parent
sys.path.insert(0, str(openagents_root / "src"))
from openagents.core.client import AgentClient
from openagents.protocols.discovery.openconvert_discovery.adapter import OpenConvertDiscoveryAdapter
from openagents.protocols.communication.simple_messaging.adapter import SimpleMessagingAgentAdapter
from openagents.models.messages import DirectMessage, BaseMessage
logger = logging.getLogger(__name__)
[docs]
class OpenConvertClient:
"""Client for interacting with the OpenConvert OpenAgents network."""
[docs]
def __init__(self, agent_id: Optional[str] = None):
"""Initialize the OpenConvert client.
Args:
agent_id: Optional agent ID. If not provided, a random one will be generated.
"""
self.agent_id = agent_id or f"openconvert-client-{uuid.uuid4().hex[:8]}"
self.client = AgentClient(self.agent_id)
self.discovery_adapter = OpenConvertDiscoveryAdapter()
self.messaging_adapter = SimpleMessagingAgentAdapter()
self.conversion_responses = {}
self.connected = False
logger.info(f"Initialized OpenConvert client with ID: {self.agent_id}")
[docs]
async def connect(self, host: str = "network.openconvert.ai", port: int = 8765) -> bool:
"""Connect to the OpenConvert network.
Args:
host: Network host to connect to
port: Network port to connect to
Returns:
bool: True if connection successful
"""
try:
logger.info(f"Connecting to OpenConvert network at {host}:{port}")
# Connect to the network
success = await self.client.connect_to_server(
host=host,
port=port,
metadata={
"name": "OpenConvert CLI Client",
"type": "conversion_client",
"capabilities": ["file_conversion_requests"],
"version": "1.0.0"
}
)
if not success:
logger.error("Failed to connect to OpenConvert network")
return False
# Register protocol adapters
self.client.register_protocol_adapter(self.discovery_adapter)
self.client.register_protocol_adapter(self.messaging_adapter)
# Set up message handler for conversion responses
self.messaging_adapter.register_message_handler("conversion_response", self._handle_conversion_response)
self.connected = True
logger.info("✅ Successfully connected to OpenConvert network")
# Give some time for protocol registration
await asyncio.sleep(1)
return True
except Exception as e:
logger.error(f"Error connecting to network: {e}")
return False
def _handle_conversion_response(self, content: Dict[str, Any], sender_id: str) -> None:
"""Handle conversion response messages.
Args:
content: Message content
sender_id: ID of the agent that sent the response
"""
if content and (content.get("conversion_status") or content.get("action") == "conversion_result"):
self.conversion_responses[sender_id] = content
logger.debug(f"Received conversion response from {sender_id}")
[docs]
async def discover_agents(self, source_format: str, target_format: str) -> List[Dict[str, Any]]:
"""Discover agents capable of performing a specific conversion.
Args:
source_format: Source MIME type
target_format: Target MIME type
Returns:
List of agent information dictionaries
"""
if not self.connected:
raise RuntimeError("Client is not connected to network")
logger.info(f"🔍 Discovering agents for {source_format} -> {target_format}")
try:
# Use the discovery adapter to find suitable agents
agents = await self.discovery_adapter.discover_conversion_agents(source_format, target_format)
logger.info(f"📋 Found {len(agents)} capable agents:")
for agent in agents:
agent_id = agent.get('agent_id', 'Unknown')
description = agent.get('description', 'No description')
logger.info(f" - {agent_id}: {description}")
return agents
except Exception as e:
logger.error(f"Error during agent discovery: {e}")
return []
[docs]
async def convert_file(
self,
input_file: Path,
output_file: Path,
source_format: str,
target_format: str,
prompt: Optional[str] = None,
timeout: int = 60
) -> bool:
"""Convert a single file using the OpenConvert network.
Args:
input_file: Path to input file
output_file: Path to output file
source_format: Source MIME type
target_format: Target MIME type
prompt: Optional conversion instructions
timeout: Timeout in seconds for conversion
Returns:
bool: True if conversion successful
"""
if not self.connected:
raise RuntimeError("Client is not connected to network")
if not input_file.exists():
raise FileNotFoundError(f"Input file not found: {input_file}")
logger.info(f"🔄 Converting {input_file.name}: {source_format} -> {target_format}")
try:
# Discover agents capable of this conversion
agents = await self.discover_agents(source_format, target_format)
if not agents:
logger.error(f"❌ No agents found for {source_format} -> {target_format} conversion")
return False
# Use the first available agent
target_agent = agents[0]
agent_id = target_agent['agent_id']
logger.info(f"🎯 Using agent: {agent_id}")
# Read and encode the input file
file_data = input_file.read_bytes()
file_data_b64 = base64.b64encode(file_data).decode('ascii')
# Prepare conversion request
request_content = {
"file_data": file_data_b64,
"filename": input_file.name,
"source_format": source_format,
"target_format": target_format
}
# Add prompt if provided
if prompt:
request_content["prompt"] = prompt
logger.info(f"💬 Using prompt: {prompt}")
# Clear any previous responses
if agent_id in self.conversion_responses:
del self.conversion_responses[agent_id]
# Send conversion request
logger.info(f"📤 Sending conversion request to {agent_id}")
await self.messaging_adapter.send_direct_message(agent_id, request_content)
# Wait for response with timeout
for _ in range(timeout):
await asyncio.sleep(1)
if agent_id in self.conversion_responses:
response = self.conversion_responses[agent_id]
# Check if conversion was successful
if response.get("conversion_status") == "success" or response.get("success") == True:
# Extract converted file data
converted_data = response.get("file_data") or response.get("output_data")
if not converted_data:
logger.error("❌ No converted data in response")
return False
# Decode and save converted file
try:
converted_bytes = base64.b64decode(converted_data)
# Ensure output directory exists
output_file.parent.mkdir(parents=True, exist_ok=True)
# Write converted file
output_file.write_bytes(converted_bytes)
logger.info(f"✅ Conversion successful: {output_file}")
return True
except Exception as e:
logger.error(f"❌ Error saving converted file: {e}")
return False
elif response.get("conversion_status") == "error" or response.get("success") == False:
error_msg = response.get("error", "Unknown error")
logger.error(f"❌ Conversion failed: {error_msg}")
return False
# Timeout reached
logger.error(f"❌ Conversion timeout after {timeout} seconds")
return False
except Exception as e:
logger.error(f"❌ Error during file conversion: {e}")
return False
[docs]
async def convert_files_batch(
self,
files: List[Dict[str, Any]],
timeout: int = 60
) -> List[bool]:
"""Convert multiple files in batch.
Args:
files: List of file conversion specifications, each containing:
- input_file: Path to input file
- output_file: Path to output file
- source_format: Source MIME type
- target_format: Target MIME type
- prompt: Optional conversion prompt
timeout: Timeout per file in seconds
Returns:
List of success flags for each conversion
"""
results = []
for i, file_spec in enumerate(files, 1):
logger.info(f"Processing file {i}/{len(files)}")
try:
success = await self.convert_file(
input_file=file_spec['input_file'],
output_file=file_spec['output_file'],
source_format=file_spec['source_format'],
target_format=file_spec['target_format'],
prompt=file_spec.get('prompt'),
timeout=timeout
)
results.append(success)
except Exception as e:
logger.error(f"❌ Error processing file {i}: {e}")
results.append(False)
return results
[docs]
async def list_available_conversions(self) -> Dict[str, List[str]]:
"""Get a list of all available conversions in the network.
Returns:
Dictionary mapping source formats to lists of available target formats
"""
if not self.connected:
raise RuntimeError("Client is not connected to network")
logger.info("📋 Querying available conversions...")
# This would require a more sophisticated discovery mechanism
# For now, return a basic set based on known conversion categories
conversions = {}
# Common conversions to check for
common_formats = [
'text/plain', 'text/markdown', 'text/html', 'application/pdf',
'image/png', 'image/jpeg', 'image/gif', 'image/bmp',
'audio/mp3', 'audio/wav', 'video/mp4', 'application/zip'
]
for source_format in common_formats:
conversions[source_format] = []
for target_format in common_formats:
if source_format != target_format:
agents = await self.discover_agents(source_format, target_format)
if agents:
conversions[source_format].append(target_format)
return conversions
[docs]
async def disconnect(self) -> None:
"""Disconnect from the OpenConvert network."""
if self.connected:
logger.info("🔌 Disconnecting from OpenConvert network")
await self.client.disconnect()
self.connected = False
logger.info("✅ Disconnected successfully")
# Convenience functions for direct usage
[docs]
async def convert_file(
input_file: Path,
output_file: Path,
source_format: Optional[str] = None,
target_format: Optional[str] = None,
prompt: Optional[str] = None,
host: str = "network.openconvert.ai",
port: int = 8765
) -> bool:
"""Convenience function to convert a single file.
Args:
input_file: Path to input file
output_file: Path to output file
source_format: Source MIME type (auto-detected if None)
target_format: Target MIME type (auto-detected if None)
prompt: Optional conversion instructions
host: Network host
port: Network port
Returns:
bool: True if conversion successful
"""
client = OpenConvertClient()
try:
# Auto-detect formats if not provided
if source_format is None:
import mimetypes
source_format, _ = mimetypes.guess_type(str(input_file))
if source_format is None:
source_format = 'application/octet-stream'
if target_format is None:
import mimetypes
target_format, _ = mimetypes.guess_type(str(output_file))
if target_format is None:
target_format = 'application/octet-stream'
# Connect and convert
await client.connect(host=host, port=port)
success = await client.convert_file(
input_file=input_file,
output_file=output_file,
source_format=source_format,
target_format=target_format,
prompt=prompt
)
return success
finally:
await client.disconnect()