Python Integration Examples
This page provides practical examples of integrating OpenConvert into Python applications.
Web Framework Integration
Flask Application
from flask import Flask, request, send_file, jsonify, render_template
from openconvert import convert_file
import tempfile
import os
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
ALLOWED_EXTENSIONS = {'txt', 'md', 'csv', 'jpg', 'png', 'gif'}
def allowed_file(filename):
return '.' in filename and \\
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/')
def index():
return render_template('upload.html')
@app.route('/convert', methods=['POST'])
def convert_endpoint():
"""API endpoint for file conversion."""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
target_format = request.form.get('format', 'application/pdf')
prompt = request.form.get('prompt', '')
with tempfile.TemporaryDirectory() as temp_dir:
# Save uploaded file
filename = secure_filename(file.filename)
input_path = os.path.join(temp_dir, filename)
file.save(input_path)
# Convert file
output_filename = f"{os.path.splitext(filename)[0]}.pdf"
output_path = os.path.join(temp_dir, output_filename)
try:
success = convert_file(
input_path,
output_path,
to_format=target_format,
prompt=prompt if prompt else None
)
if success:
return send_file(
output_path,
as_attachment=True,
download_name=output_filename
)
else:
return jsonify({'error': 'Conversion failed'}), 500
except Exception as e:
return jsonify({'error': f'Conversion error: {str(e)}'}), 500
@app.route('/formats')
def list_formats():
"""Get available conversion formats."""
# This would integrate with OpenConvert's format discovery
formats = {
'text/plain': ['application/pdf', 'text/html'],
'image/jpeg': ['image/png', 'image/webp', 'application/pdf'],
'text/csv': ['application/pdf', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']
}
return jsonify(formats)
if __name__ == '__main__':
app.run(debug=True)
FastAPI Application
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import FileResponse
from openconvert import convert_file
import tempfile
import os
from typing import Optional
app = FastAPI(title="OpenConvert API", version="1.0.0")
@app.post("/convert")
async def convert_file_endpoint(
file: UploadFile = File(...),
target_format: str = Form("application/pdf"),
prompt: Optional[str] = Form(None)
):
"""Convert uploaded file to specified format."""
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
with tempfile.TemporaryDirectory() as temp_dir:
# Save uploaded file
input_path = os.path.join(temp_dir, file.filename)
with open(input_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
# Convert file
output_filename = f"{os.path.splitext(file.filename)[0]}.pdf"
output_path = os.path.join(temp_dir, output_filename)
try:
success = convert_file(
input_path,
output_path,
to_format=target_format,
prompt=prompt
)
if success:
return FileResponse(
output_path,
filename=output_filename,
media_type='application/octet-stream'
)
else:
raise HTTPException(status_code=500, detail="Conversion failed")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
Django Integration
# views.py
from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from openconvert import convert_file
import tempfile
import os
import json
@csrf_exempt
def convert_view(request):
if request.method == 'POST':
if 'file' not in request.FILES:
return JsonResponse({'error': 'No file provided'}, status=400)
uploaded_file = request.FILES['file']
target_format = request.POST.get('format', 'application/pdf')
prompt = request.POST.get('prompt', '')
with tempfile.TemporaryDirectory() as temp_dir:
# Save uploaded file
input_path = os.path.join(temp_dir, uploaded_file.name)
with open(input_path, 'wb+') as destination:
for chunk in uploaded_file.chunks():
destination.write(chunk)
# Convert file
output_filename = f"{os.path.splitext(uploaded_file.name)[0]}.pdf"
output_path = os.path.join(temp_dir, output_filename)
try:
success = convert_file(
input_path,
output_path,
to_format=target_format,
prompt=prompt if prompt else None
)
if success:
with open(output_path, 'rb') as f:
response = HttpResponse(
f.read(),
content_type='application/octet-stream'
)
response['Content-Disposition'] = f'attachment; filename="{output_filename}"'
return response
else:
return JsonResponse({'error': 'Conversion failed'}, status=500)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
return render(request, 'convert.html')
Desktop Application Integration
Tkinter GUI Application
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from openconvert import convert_file
import threading
import os
class OpenConvertGUI:
def __init__(self, root):
self.root = root
self.root.title("OpenConvert GUI")
self.root.geometry("600x400")
self.setup_ui()
def setup_ui(self):
# File selection
file_frame = ttk.Frame(self.root, padding="10")
file_frame.grid(row=0, column=0, sticky=(tk.W, tk.E))
ttk.Label(file_frame, text="Input File:").grid(row=0, column=0, sticky=tk.W)
self.file_path = tk.StringVar()
ttk.Entry(file_frame, textvariable=self.file_path, width=50).grid(row=0, column=1, padx=5)
ttk.Button(file_frame, text="Browse", command=self.browse_file).grid(row=0, column=2)
# Output format selection
format_frame = ttk.Frame(self.root, padding="10")
format_frame.grid(row=1, column=0, sticky=(tk.W, tk.E))
ttk.Label(format_frame, text="Output Format:").grid(row=0, column=0, sticky=tk.W)
self.output_format = tk.StringVar(value="PDF")
format_combo = ttk.Combobox(
format_frame,
textvariable=self.output_format,
values=["PDF", "Word Document", "PNG Image", "WebP Image"]
)
format_combo.grid(row=0, column=1, padx=5)
# Prompt input
prompt_frame = ttk.Frame(self.root, padding="10")
prompt_frame.grid(row=2, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
ttk.Label(prompt_frame, text="Conversion Prompt (optional):").grid(row=0, column=0, sticky=tk.W)
self.prompt_text = tk.Text(prompt_frame, height=6, width=70)
self.prompt_text.grid(row=1, column=0, columnspan=2, pady=5)
# Convert button
button_frame = ttk.Frame(self.root, padding="10")
button_frame.grid(row=3, column=0)
self.convert_button = ttk.Button(
button_frame,
text="Convert File",
command=self.convert_file_threaded
)
self.convert_button.grid(row=0, column=0, padx=5)
# Progress bar
self.progress = ttk.Progressbar(button_frame, mode='indeterminate')
self.progress.grid(row=0, column=1, padx=5)
# Status label
self.status_label = ttk.Label(self.root, text="Ready")
self.status_label.grid(row=4, column=0, pady=5)
def browse_file(self):
filename = filedialog.askopenfilename(
title="Select file to convert",
filetypes=[
("Text files", "*.txt"),
("Markdown files", "*.md"),
("CSV files", "*.csv"),
("Image files", "*.jpg *.jpeg *.png *.gif"),
("All files", "*.*")
]
)
if filename:
self.file_path.set(filename)
def get_mime_type(self, format_name):
format_map = {
"PDF": "application/pdf",
"Word Document": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"PNG Image": "image/png",
"WebP Image": "image/webp"
}
return format_map.get(format_name, "application/pdf")
def convert_file_threaded(self):
"""Run conversion in separate thread to avoid freezing UI."""
thread = threading.Thread(target=self.convert_file_worker)
thread.daemon = True
thread.start()
def convert_file_worker(self):
input_path = self.file_path.get()
if not input_path:
messagebox.showerror("Error", "Please select an input file")
return
if not os.path.exists(input_path):
messagebox.showerror("Error", "Input file does not exist")
return
# Start progress animation
self.progress.start()
self.convert_button.config(state='disabled')
self.status_label.config(text="Converting...")
try:
# Generate output path
base_name = os.path.splitext(input_path)[0]
output_path = f"{base_name}_converted.pdf"
# Get conversion parameters
target_format = self.get_mime_type(self.output_format.get())
prompt = self.prompt_text.get("1.0", tk.END).strip()
# Perform conversion
success = convert_file(
input_path,
output_path,
to_format=target_format,
prompt=prompt if prompt else None
)
# Update UI on main thread
self.root.after(0, self.conversion_complete, success, output_path)
except Exception as e:
self.root.after(0, self.conversion_error, str(e))
def conversion_complete(self, success, output_path):
self.progress.stop()
self.convert_button.config(state='normal')
if success:
self.status_label.config(text=f"Conversion completed: {output_path}")
messagebox.showinfo("Success", f"File converted successfully!\\nOutput: {output_path}")
else:
self.status_label.config(text="Conversion failed")
messagebox.showerror("Error", "Conversion failed")
def conversion_error(self, error_message):
self.progress.stop()
self.convert_button.config(state='normal')
self.status_label.config(text="Error occurred")
messagebox.showerror("Error", f"Conversion error: {error_message}")
if __name__ == "__main__":
root = tk.Tk()
app = OpenConvertGUI(root)
root.mainloop()
Data Science Integration
Jupyter Notebook Helper
import pandas as pd
import matplotlib.pyplot as plt
from openconvert import convert_file
import tempfile
import os
from IPython.display import display, HTML
class NotebookConverter:
"""Helper class for converting data and plots in Jupyter notebooks."""
def __init__(self):
self.temp_files = []
def dataframe_to_pdf(self, df, filename="data_report.pdf", prompt=None):
"""Convert DataFrame to PDF report."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
df.to_csv(f.name, index=False)
self.temp_files.append(f.name)
success = convert_file(
f.name,
filename,
to_format="application/pdf",
prompt=prompt or "Create formatted data table with headers"
)
if success:
print(f"✅ DataFrame converted to {filename}")
return filename
else:
print("❌ Conversion failed")
return None
def plot_to_pdf(self, fig, filename="plot.pdf", prompt=None):
"""Convert matplotlib figure to PDF."""
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
fig.savefig(f.name, dpi=300, bbox_inches='tight')
self.temp_files.append(f.name)
success = convert_file(
f.name,
filename,
to_format="application/pdf",
prompt=prompt or "High quality plot for publication"
)
if success:
print(f"✅ Plot converted to {filename}")
return filename
else:
print("❌ Conversion failed")
return None
def create_report(self, dataframes, plots, output_file="analysis_report.pdf"):
"""Create comprehensive report from multiple dataframes and plots."""
# This would combine multiple elements into a single report
# Implementation would depend on specific requirements
pass
def cleanup(self):
"""Clean up temporary files."""
for temp_file in self.temp_files:
try:
os.unlink(temp_file)
except:
pass
self.temp_files = []
# Usage example
converter = NotebookConverter()
# Convert DataFrame
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
converter.dataframe_to_pdf(df, "my_data.pdf", "Professional data table")
# Convert plot
fig, ax = plt.subplots()
ax.plot([1, 2, 3], [4, 5, 6])
converter.plot_to_pdf(fig, "my_plot.pdf", "Scientific publication quality")
# Cleanup when done
converter.cleanup()
Automation and Scripting
File Watcher Service
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from openconvert import convert_file
import logging
class ConversionHandler(FileSystemEventHandler):
"""Handler for automatic file conversion."""
def __init__(self, output_dir, target_format="application/pdf", prompt=None):
self.output_dir = output_dir
self.target_format = target_format
self.prompt = prompt
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def on_created(self, event):
"""Handle new file creation."""
if event.is_directory:
return
file_path = event.src_path
# Only process certain file types
if not self.should_process(file_path):
return
# Wait a moment for file to be fully written
time.sleep(1)
self.convert_file(file_path)
def should_process(self, file_path):
"""Check if file should be processed."""
_, ext = os.path.splitext(file_path)
return ext.lower() in ['.txt', '.md', '.csv', '.jpg', '.png']
def convert_file(self, input_path):
"""Convert the file."""
try:
filename = os.path.basename(input_path)
name, _ = os.path.splitext(filename)
output_path = os.path.join(self.output_dir, f"{name}.pdf")
self.logger.info(f"Converting {input_path} to {output_path}")
success = convert_file(
input_path,
output_path,
to_format=self.target_format,
prompt=self.prompt
)
if success:
self.logger.info(f"✅ Successfully converted {filename}")
else:
self.logger.error(f"❌ Failed to convert {filename}")
except Exception as e:
self.logger.error(f"Error converting {input_path}: {e}")
def watch_directory(input_dir, output_dir, target_format="application/pdf", prompt=None):
"""Watch directory for new files and convert them automatically."""
event_handler = ConversionHandler(output_dir, target_format, prompt)
observer = Observer()
observer.schedule(event_handler, input_dir, recursive=True)
observer.start()
print(f"Watching {input_dir} for new files...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
print("Stopping file watcher...")
observer.join()
# Usage
if __name__ == "__main__":
watch_directory(
"input_files/",
"converted_files/",
prompt="Automatic conversion with professional formatting"
)
Scheduled Batch Processing
import schedule
import time
import os
from pathlib import Path
from openconvert import convert_file
import logging
from datetime import datetime
class ScheduledConverter:
"""Scheduled batch file conversion service."""
def __init__(self, input_dir, output_dir, archive_dir=None):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.archive_dir = Path(archive_dir) if archive_dir else None
# Create directories
self.output_dir.mkdir(exist_ok=True)
if self.archive_dir:
self.archive_dir.mkdir(exist_ok=True)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('conversion_scheduler.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def process_pending_files(self):
"""Process all pending files in input directory."""
self.logger.info("Starting scheduled conversion batch")
processed_count = 0
failed_count = 0
for file_path in self.input_dir.glob("*"):
if file_path.is_file() and self.should_process(file_path):
try:
if self.convert_single_file(file_path):
processed_count += 1
# Move to archive if configured
if self.archive_dir:
archive_path = self.archive_dir / file_path.name
file_path.rename(archive_path)
else:
failed_count += 1
except Exception as e:
self.logger.error(f"Error processing {file_path}: {e}")
failed_count += 1
self.logger.info(f"Batch completed: {processed_count} successful, {failed_count} failed")
def convert_single_file(self, file_path):
"""Convert a single file."""
output_path = self.output_dir / f"{file_path.stem}.pdf"
self.logger.info(f"Converting {file_path.name}")
success = convert_file(
str(file_path),
str(output_path),
to_format="application/pdf",
prompt="Scheduled batch conversion with standard formatting"
)
if success:
self.logger.info(f"✅ Converted {file_path.name}")
else:
self.logger.error(f"❌ Failed to convert {file_path.name}")
return success
def should_process(self, file_path):
"""Check if file should be processed."""
return file_path.suffix.lower() in ['.txt', '.md', '.csv']
def start_scheduler(self):
"""Start the scheduled processing."""
# Schedule conversions
schedule.every().hour.do(self.process_pending_files)
schedule.every().day.at("09:00").do(self.process_pending_files)
schedule.every().monday.at("08:00").do(self.weekly_cleanup)
self.logger.info("Scheduler started")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
def weekly_cleanup(self):
"""Weekly cleanup of old files."""
self.logger.info("Running weekly cleanup")
# Implement cleanup logic here
pass
# Usage
if __name__ == "__main__":
converter = ScheduledConverter(
input_dir="incoming/",
output_dir="converted/",
archive_dir="processed/"
)
converter.start_scheduler()
See Also
Python API - Complete Python API reference
Batch Processing Examples - Batch processing examples
Advanced Usage - Advanced usage patterns