MCP Durability: Server and Client with Persistence for Long-Running Tasks

MCP Durability: Server and Client with Persistence for Long-Running Tasks MCP Durability: Server and Client with Persistence for Long-Running Tasks

In this post, we will see how to create an MCP server and client with the ability for the client to request a long-running task from the server, stop the client, continue running the task on the server, reconnect the client, and view the status of the task.

MCP server with durabilitylink image 11

The server will be similar to the one we made in the posts streamable MCP and resumable MCP, with the difference that now we are going to add the ability for the client to request a long-running task from the server, stop the client, the task continues to run on the server, reconnect the client, and view the status of the task.

Furthermore, in this case, we are going to save the tasks in an SQLite database to make it more professional, since in previous posts we saved the information in json files.

Implementation of the MCP server with durabilitylink image 12

The server will have a DurableTaskManager class that will manage the creation of tasks, save them in the database, report their status, and be able to cancel them.

It will have three resources: to obtain the status of a task, to list all tasks, and to list tasks by status.

It will have five tools, three of which will be specific tasks (data migration, data processing, and model training), one for canceling a task, and the last one for obtaining server statistics.

Create virtual server environmentlink image 13

First, we create the folder where we are going to develop it.

	
!mkdir MCP_durability_server
Copy

We create the environment with uv

	
!cd MCP_durability_server && uv init .
Copy
	
Initialized project `mcp-durability-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_server`

We started it

	
!cd MCP_durability_server && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

We install the necessary libraries.

	
!cd MCP_durability_server && uv add fastmcp
Copy
	
Resolved 64 packages in 344ms
Installed 61 packages in 136ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.3
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.23.0
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.3.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.1
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.15.0
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Server codelink image 14

	
%%writefile MCP_durability_server/server.py
"""
MCP Durability Server
MPC server that implements durability for long-running agents using Resource links.
Allows long-running operations to survive server restarts and provides state tracking
outside of band.
Implemented pattern:
1. Tools return resource links immediately
2. Background processing continues asynchronously
3. Clients can poll or subscribe to resources for state updates
Usage:
python server.py
"""
import asyncio
import json
import sqlite3
import time
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
from fastmcp import FastMCP
from fastmcp.server.context import Context
class TaskStatus(Enum):
"""Long running task status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskResult:
"""Result data for a completed task."""
task_id: str
status: TaskStatus
progress: float = 0.0
total: Optional[float] = None
message: Optional[str] = None
result_data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: float = 0.0
updated_at: float = 0.0
completed_at: Optional[float] = None
class DurableTaskManager:
"""Manages persistent task state and background execution."""
def __init__(self, db_path: str = "mcp_tasks.db"):
self.db_path = db_path
self._background_tasks: Dict[str, asyncio.Task] = {}
self._setup_database()
def _setup_database(self) -> None:
"""Initializes SQLite database for task persistence."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
progress REAL DEFAULT 0.0,
total REAL,
message TEXT,
result_data TEXT,
error TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
completed_at REAL
)
""")
conn.commit()
conn.close()
async def create_task(self, task_id: Optional[str] = None) -> str:
"""Creates a new task and returns its ID."""
if not task_id:
task_id = str(uuid.uuid4())
current_time = time.time()
task_result = TaskResult(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=current_time,
updated_at=current_time
)
await self._save_task(task_result)
return task_id
async def start_background_task(
self,
task_id: str,
task_function,
context: Optional[Context] = None
) -> None:
"""Starts a background task and tracks its execution."""
async def wrapper():
try:
await self._update_task_status(task_id, TaskStatus.RUNNING)
# Execute the real task
if context:
result = await task_function(task_id, context, self)
else:
result = await task_function(task_id, self)
# Mark as completed with results
await self._update_task_completion(task_id, result)
except asyncio.CancelledError:
await self._update_task_status(task_id, TaskStatus.CANCELLED)
raise
except Exception as e:
await self._update_task_error(task_id, str(e))
finally:
# Clean up background task reference
self._background_tasks.pop(task_id, None)
# Start the task in the background
task = asyncio.create_task(wrapper())
self._background_tasks[task_id] = task
async def _save_task(self, task_result: TaskResult) -> None:
"""Saves the task result to the database."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
INSERT OR REPLACE INTO tasks
(task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task_result.task_id,
task_result.status.value,
task_result.progress,
task_result.total,
task_result.message,
json.dumps(task_result.result_data) if task_result.result_data else None,
task_result.error,
task_result.created_at,
task_result.updated_at,
task_result.completed_at
))
conn.commit()
conn.close()
async def get_task(self, task_id: str) -> Optional[TaskResult]:
"""Retrieves the task result from the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks WHERE task_id = ?
""", (task_id,))
row = cursor.fetchone()
conn.close()
if not row:
return None
result_data = None
if row[5]: # result_data
try:
result_data = json.loads(row[5])
except json.JSONDecodeError:
pass
return TaskResult(
task_id=row[0],
status=TaskStatus(row[1]),
progress=row[2],
total=row[3],
message=row[4],
result_data=result_data,
error=row[6],
created_at=row[7],
updated_at=row[8],
completed_at=row[9]
)
async def update_task_progress(
self,
task_id: str,
progress: float,
total: Optional[float] = None,
message: Optional[str] = None
) -> None:
"""Updates the task progress."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.progress = progress
if total is not None:
task_result.total = total
if message is not None:
task_result.message = message
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_status(self, task_id: str, status: TaskStatus) -> None:
"""Updates the task status."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = status
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_completion(self, task_id: str, result_data: Any) -> None:
"""Marks the task as completed with results."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = TaskStatus.COMPLETED
task_result.progress = task_result.total or 100.0
task_result.result_data = result_data if isinstance(result_data, dict) else {"result": result_data}
task_result.completed_at = time.time()
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_error(self, task_id: str, error: str) -> None:
"""Marks the task as failed with error."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = TaskStatus.FAILED
task_result.error = error
task_result.updated_at = time.time()
await self._save_task(task_result)
async def cancel_task(self, task_id: str) -> bool:
"""Cancels a background task in execution."""
if task_id in self._background_tasks:
task = self._background_tasks[task_id]
task.cancel()
await self._update_task_status(task_id, TaskStatus.CANCELLED)
return True
return False
async def list_tasks(self, status_filter: Optional[TaskStatus] = None) -> List[TaskResult]:
"""Lists all tasks, optionally filtered by status."""
conn = sqlite3.connect(self.db_path)
if status_filter:
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks WHERE status = ? ORDER BY updated_at DESC
""", (status_filter.value,))
else:
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks ORDER BY updated_at DESC
""")
rows = cursor.fetchall()
conn.close()
tasks = []
for row in rows:
result_data = None
if row[5]:
try:
result_data = json.loads(row[5])
except json.JSONDecodeError:
pass
tasks.append(TaskResult(
task_id=row[0],
status=TaskStatus(row[1]),
progress=row[2],
total=row[3],
message=row[4],
result_data=result_data,
error=row[6],
created_at=row[7],
updated_at=row[8],
completed_at=row[9]
))
return tasks
# Create the MCP server with durability capabilities
server = FastMCP(
name="MCP Durability Server",
instructions="An MCP server that demonstrates durability for long-running agents"
)
# Initialize the task manager
task_manager = DurableTaskManager()
# Resources to track the state of tasks
@server.resource("task://status/{task_id}")
async def get_task_status(task_id: str) -> str:
"""Gets the current status of a task by ID."""
task_result = await task_manager.get_task(task_id)
if not task_result:
return json.dumps({"error": "Task not found", "task_id": task_id})
data = {
"task_id": task_result.task_id,
"status": task_result.status.value,
"progress": task_result.progress,
"total": task_result.total,
"message": task_result.message,
"result_data": task_result.result_data,
"error": task_result.error,
"created_at": task_result.created_at,
"updated_at": task_result.updated_at,
"completed_at": task_result.completed_at,
}
return json.dumps(data)
@server.resource("task://list")
async def list_all_tasks() -> str:
"""Lists all tasks with their current status."""
tasks = await task_manager.list_tasks()
data = {
"tasks": [
{
"task_id": task.task_id,
"status": task.status.value,
"progress": task.progress,
"total": task.total,
"message": task.message,
"created_at": task.created_at,
"updated_at": task.updated_at,
"completed_at": task.completed_at,
"has_result": task.result_data is not None,
"has_error": task.error is not None
} for task in tasks
],
"total": len(tasks)
}
return json.dumps(data)
@server.resource("task://list/{status}")
async def list_tasks_by_status(status: str) -> str:
"""Lists tasks filtered by status."""
try:
status_enum = TaskStatus(status.lower())
tasks = await task_manager.list_tasks(status_enum)
data = {
"tasks": [
{
"task_id": task.task_id,
"status": task.status.value,
"progress": task.progress,
"total": task.total,
"message": task.message,
"created_at": task.created_at,
"updated_at": task.updated_at,
"completed_at": task.completed_at
} for task in tasks
],
"status_filter": status,
"total": len(tasks)
}
return json.dumps(data)
except ValueError:
data = {
"error": f"Invalid status: {status}",
"valid_statuses": [s.value for s in TaskStatus]
}
return json.dumps(data)
# Tools to demonstrate durability functionality
@server.tool
async def start_data_migration(
source_path: str,
destination_path: str,
ctx: Context,
record_count: int = 1000
) -> str:
"""
Starts a long-running data migration task.
Args:
source_path: Source path of the data
destination_path: Destination path of the data
record_count: Number of records to migrate
ctx: Contexto MCP
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting migration of {record_count} records from {source_path} to {destination_path}")
print(f"Starting migration of {record_count} records from {source_path} to {destination_path}")
async def migration_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Simulated data migration task."""
batch_size = 100
migrated = 0
for batch_start in range(0, record_count, batch_size):
# Simulate batch processing
await asyncio.sleep(1) # Simulate work
batch_end = min(batch_start + batch_size, record_count)
migrated = batch_end
# Update progress
progress_pct = (migrated / record_count) * 100
message = f"Migrated {migrated}/{record_count} records"
await manager.update_task_progress(task_id, progress_pct, 100.0, message)
try:
await context.report_progress(migrated, record_count, message)
except Exception:
pass
try:
await context.info(f"Data migration completed: {migrated} records")
except Exception:
pass
return {
"migrated_records": migrated,
"total_records": record_count,
"source_path": source_path,
"destination_path": destination_path,
"success": True,
"completion_time": time.time()
}
# Create the task and start background processing
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, migration_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Data migration started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def start_batch_processing(
batch_size: int,
total_items: int,
ctx: Context,
processing_delay: float = 10
) -> str:
"""
Starts a batch processing task.
Args:
batch_size: Size of each batch
total_items: Total number of items to process
processing_delay: Delay per batch in seconds
ctx: MCP context
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting batch processing: {total_items} items in batches of {batch_size}")
async def batch_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Batch processing task."""
processed = 0
for i in range(0, total_items, batch_size):
await asyncio.sleep(processing_delay) # Simulate work
batch_end = min(i + batch_size, total_items)
processed = batch_end
progress = (processed / total_items) * 100
message = f"Processed {processed}/{total_items} items"
await manager.update_task_progress(task_id, progress, 100.0, message)
try:
await context.report_progress(processed, total_items, message)
except Exception:
pass
try:
await context.info(f"Batch processing completed: {processed} items")
except Exception:
pass
return {
"processed": processed,
"total": total_items,
"batch_size": batch_size,
"processing_delay": processing_delay,
"success": True
}
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, batch_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Batch processing started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def start_ml_training(
model_name: str,
ctx: Context,
dataset_size: int = 10000,
epochs: int = 100
) -> str:
"""
Simulates machine learning model training.
Args:
model_name: Name of the model to train
dataset_size: Size of the dataset
epochs: Number of training epochs
ctx: MCP context
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting training of model '{model_name}' with {dataset_size} samples by {epochs} epochs")
async def training_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Simulated ML training task."""
for epoch in range(1, epochs + 1):
# Simulate training of an epoch
await asyncio.sleep(10)
# Simulate training metrics
loss = 1.0 - (epoch / epochs) * 0.8 + (epoch % 10) * 0.01
accuracy = (epoch / epochs) * 0.95 + (epoch % 5) * 0.002
progress = (epoch / epochs) * 100
message = f"Epoch {epoch}/{epochs} - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}"
await manager.update_task_progress(task_id, progress, 100.0, message)
try:
await context.report_progress(epoch, epochs, message)
except Exception:
pass
try:
await context.info(f"Model '{model_name}' training completed")
except Exception:
pass
return {
"model_name": model_name,
"dataset_size": dataset_size,
"epochs_completed": epochs,
"final_loss": loss,
"final_accuracy": accuracy,
"success": True,
"training_time_seconds": epochs * 0.5
}
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, training_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Training started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def cancel_task(task_id: str, ctx: Context) -> str:
"""
Cancels an in-progress task.
Args:
task_id: ID of the task to cancel
ctx: MCP context
Returns:
Status message of the cancellation
"""
success = await task_manager.cancel_task(task_id)
if success:
await ctx.info(f"Task {task_id} cancelled successfully")
return f"Task {task_id} cancelled. Status available at: task://status/{task_id}"
else:
await ctx.warning(f"Task {task_id} not found or cannot be cancelled")
return f"Task {task_id} not found or cannot be cancelled"
@server.tool
async def get_server_stats(ctx: Context) -> Dict[str, Any]:
"""
Gets durability server statistics.
Args:
ctx: MCP context
Returns:
Server statistics
"""
all_tasks = await task_manager.list_tasks()
stats = {
"total_tasks": len(all_tasks),
"running_tasks": len([t for t in all_tasks if t.status == TaskStatus.RUNNING]),
"completed_tasks": len([t for t in all_tasks if t.status == TaskStatus.COMPLETED]),
"failed_tasks": len([t for t in all_tasks if t.status == TaskStatus.FAILED]),
"cancelled_tasks": len([t for t in all_tasks if t.status == TaskStatus.CANCELLED]),
"pending_tasks": len([t for t in all_tasks if t.status == TaskStatus.PENDING]),
"active_background_tasks": len(task_manager._background_tasks),
}
await ctx.info(f"Server statistics: {stats['total_tasks']} total tasks, {stats['running_tasks']} running")
return stats
if __name__ == "__main__":
print("Durability MCP server started")
print("Available tools:")
print(" - start_data_migration: Starts data migration")
print(" - start_batch_processing: Starts batch processing")
print(" - start_ml_training: Simulates ML training")
print(" - cancel_task: Cancels a task")
print(" - get_server_stats: Gets server statistics")
print(" Resources available:")
print(" - task://status/{task_id}: Specific task status")
print(" - task://list: Lists all tasks")
print(" - task://list/{status}: Lists tasks by status")
server.run(transport="http", host="127.0.0.1", port=8080)
Copy
	
Writing MCP_durability_server/server.py

MCP client with durabilitylink image 15

The client will be very similar to the one we made in the posts streamable MCP and resumable MCP.

Implementation of the MCP client with durabilitylink image 16

There will be a DurabilityClient class that will have methods to connect to the server, start the three possible tasks (data migration, data processing, and model training), obtain the status of a task, obtain a list of all tasks, or cancel a task.

Create virtual customer environmentlink image 17

First, we create the folder where we are going to develop it.

	
!mkdir MCP_durability_client
Copy

We create the environment with uv

	
!cd MCP_durability_client && uv init .
Copy
	
Initialized project `mcp-durability-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_client`

We started it

	
!cd MCP_durability_client && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

We install the necessary libraries.

	
!cd MCP_durability_client && uv add fastmcp
Copy
	
Resolved 64 packages in 17ms
Installed 61 packages in 80ms2
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.3
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.23.0
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.3.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.1
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.15.0
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Customer codelink image 18

	
%%writefile MCP_durability_client/client.py
"""
Durability MCP client
Client that demonstrates how to interact with an MCP server that implements durability.
Shows how to:
1. Start long-running tasks
2. Monitor progress using resource polling
3. Subscribe to status updates (simulated)
4. Handle persistent tasks that survive server restarts
Usage:
python client.py
"""
import asyncio
import json
import time
from typing import Any, Dict, List, Optional
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
import argparse
class DurabilityClient:
"""Client that demonstrates durability patterns with MCP."""
def __init__(self, server_command: List[str]):
"""
Initializes the durability client.
Args:
server_command: Command to execute the MCP server
"""
self.server_command = server_command
self._polling_tasks: Dict[str, asyncio.Task] = {}
async def connect(self) -> Client:
"""Creates and connects the client to the server."""
transport = StreamableHttpTransport(
url="http://127.0.0.1:8080/mcp"
)
client = Client(transport)
return client
async def start_data_migration(
self,
client: Client,
source_path: str = "/data/source",
destination_path: str = "/data/destination",
record_count: int = 1000
) -> str:
"""
Starts a data migration task and returns the resource link.
Args:
client: MCP connected client
source_path: Source path
destination_path: Destination path
record_count: Number of records to migrate
Returns:
Resource URI to track progress
"""
print(f"🚀 Starting data migration: {record_count} records")
print(f" Source: {source_path}")
print(f" Destination: {destination_path}")
result = await client.call_tool(
"start_data_migration",
{
"source_path": source_path,
"destination_path": destination_path,
"record_count": record_count
}
)
resource_uri = result.content[0].text
print(f"✅ Task started. Resource URI: {resource_uri}")
return resource_uri
async def start_batch_processing(
self,
client: Client,
batch_size: int = 50,
total_items: int = 500,
processing_delay: float = 0.3
) -> str:
"""
Starts batch processing.
Args:
client: MCP connected client
batch_size: Batch size
total_items: Total items
processing_delay: Processing delay
Returns:
Resource URI to track progress
"""
print(f"🔄 Starting batch processing:")
print(f" Total items: {total_items}")
print(f" Batch size: {batch_size}")
print(f" Processing delay: {processing_delay}s")
result = await client.call_tool(
"start_batch_processing",
{
"batch_size": batch_size,
"total_items": total_items,
"processing_delay": processing_delay
}
)
resource_uri = result.content[0].text
print(f"✅ Batch processing started. Resource URI: {resource_uri}")
return resource_uri
async def start_ml_training(
self,
client: Client,
model_name: str = "clasificador-texto",
dataset_size: int = 5000,
epochs: int = 50
) -> str:
"""
Starts ML model training.
Args:
client: MCP connected client
model_name: Model name
dataset_size: Dataset size
epochs: Number of epochs
Returns:
Resource URI to track progress
"""
print(f"🤖 Starting ML training:")
print(f" Model: {model_name}")
print(f" Dataset size: {dataset_size} samples")
print(f" Epochs: {epochs}")
result = await client.call_tool(
"start_ml_training",
{
"model_name": model_name,
"dataset_size": dataset_size,
"epochs": epochs
}
)
resource_uri = result.content[0].text
print(f"✅ ML training started. Resource URI: {resource_uri}")
return resource_uri
async def get_task_status(self, client: Client, resource_uri: str) -> Dict[str, Any]:
"""
Gets the current status of a task.
Args:
client: MCP connected client
resource_uri: Resource URI of the task
Returns:
Current status of the task
"""
try:
result = await client.read_resource(resource_uri)
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}, text={getattr(content_block, 'text', None)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading resource: {str(e)}"}
async def poll_task_until_complete(
self,
client: Client,
resource_uri: str,
poll_interval: float = 1.0,
max_duration: float = 10.0
) -> Dict[str, Any]:
"""
Polls a task until it completes or fails.
Args:
client: MCP connected client
resource_uri: Resource URI of the task
poll_interval: Interval between polls in seconds
max_duration: Maximum duration of polling
Returns:
Final status of the task
"""
task_id = resource_uri.split("/")[-1]
print(f"📊 Monitoring task {task_id}...")
start_time = time.time()
last_progress = -1
while time.time() - start_time < max_duration:
status = await self.get_task_status(client, resource_uri)
if "error" in status and status["error"] is not None:
print(f"Error getting status: {status['error']}")
return status
current_status = status.get("status", "unknown")
progress = status.get("progress", 0)
message = status.get("message", "")
# Show updates only if the progress changed
if progress != last_progress:
progress_bar = self._create_progress_bar(progress)
print(f" {progress_bar} {progress:5.1f}% | {current_status.upper()} | {message}")
last_progress = progress
# Check if the task is complete
if current_status in ["completed", "failed", "cancelled"]:
print(f"Task {current_status}: {task_id}")
if current_status == "completed" and status.get("result_data"):
print("Results:")
self._print_results(status["result_data"])
elif current_status == "failed" and status.get("error"):
print(f"Error: {status['error']}")
return status
await asyncio.sleep(poll_interval)
print(f"Timeout for task {task_id}")
return await self.get_task_status(client, resource_uri)
def _create_progress_bar(self, progress: float, width: int = 20) -> str:
"""Creates a visual progress bar."""
filled = int(width * progress / 100)
bar = "█" * filled + "░" * (width - filled)
return f"[{bar}]"
def _print_results(self, result_data: Dict[str, Any], indent: int = 4) -> None:
"""Prints the results in a formatted way."""
spaces = " " * indent
for key, value in result_data.items():
if isinstance(value, dict):
print(f"{spaces}{key}:")
self._print_results(value, indent + 2)
elif isinstance(value, list):
print(f"{spaces}{key}: {len(value)} items")
else:
print(f"{spaces}{key}: {value}")
async def list_all_tasks(self, client: Client) -> Dict[str, Any]:
"""Lists all tasks on the server."""
try:
result = await client.read_resource("task://list")
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading task list: {str(e)}"}
async def list_tasks_by_status(self, client: Client, status: str) -> Dict[str, Any]:
"""Lists tasks filtered by status."""
try:
result = await client.read_resource(f"task://list/{status}")
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading task list: {str(e)}"}
async def print_task_list(self, client: Client) -> None:
"""Prints a formatted list of all tasks."""
print("Task list:")
task_list = await self.list_all_tasks(client)
if "error" in task_list:
print(f" {task_list['error']}")
return
tasks = task_list.get("tasks", [])
if not tasks:
print(" No tasks")
return
for task in tasks:
task_id = task["task_id"]
status = task["status"].upper()
progress = task.get("progress", 0)
message = task.get("message", "")
print(f" ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")
async def select_task(self, client: Client) -> Optional[str]:
"""
Shows a numbered list of tasks and allows the user to select one.
Args:
client: MCP connected client
Returns:
The ID of the selected task, or None if no task was selected
"""
print("📋 Available tasks:")
task_list = await self.list_all_tasks(client)
if "error" in task_list:
print(f" ❌ {task_list['error']}")
return None
tasks = task_list.get("tasks", [])
if not tasks:
print(" No tasks available")
return None
# Show tasks with numbers
for i, task in enumerate(tasks, 1):
task_id = task["task_id"]
status = task["status"].upper()
progress = task.get("progress", 0)
message = task.get("message", "")
print(f" {i}. ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")
# Ask user to select
try:
choice = input(f" Select a task (1-{len(tasks)}) or press Enter to cancel: ").strip()
if not choice:
print("Selection cancelled")
return None
choice_num = int(choice)
if 1 <= choice_num <= len(tasks):
selected_task = tasks[choice_num - 1]
selected_id = selected_task["task_id"]
print(f"✅ Selected task: {selected_id}")
return selected_id
else:
print(f"❌ Invalid selection. Please choose between 1 and {len(tasks)}")
return None
except ValueError:
print("❌ Invalid input. Please enter a number")
return None
except Exception as e:
print(f"❌ Error selecting task: {str(e)}")
return None
async def cancel_task(self, client: Client, task_id: str) -> str:
"""Cancels a specific task."""
try:
result = await client.call_tool("cancel_task", {"task_id": task_id})
response = result.content[0].text
print(f"🚫 {response}")
return response
except Exception as e:
error_msg = f"Error canceling task: {str(e)}"
print(f"❌ {error_msg}")
return error_msg
async def get_server_stats(self, client: Client) -> Dict[str, Any]:
"""Gets server statistics."""
try:
result = await client.call_tool("get_server_stats", {})
# result.content is a list of ContentBlock
if hasattr(result, 'content') and len(result.content) > 0:
content = result.content[0]
if hasattr(content, 'text'):
return json.loads(content.text)
else:
# If not text, it may be the object directly
return content if isinstance(content, dict) else {"error": f"Unexpected format: {type(content)}"}
else:
return {"error": "Empty response from get_server_stats"}
except Exception as e:
return {"error": f"Error getting server statistics: {str(e)}"}
async def print_server_stats(self, client: Client) -> None:
"""Prints server statistics."""
print("📊 Server statistics:")
stats = await self.get_server_stats(client)
if "error" in stats:
print(f" ❌ {stats['error']}")
return
print(f" Total tasks: {stats.get('total_tasks', 0)}")
print(f" Running: {stats.get('running_tasks', 0)}")
print(f" Completed: {stats.get('completed_tasks', 0)}")
print(f" Failed: {stats.get('failed_tasks', 0)}")
print(f" Cancelled: {stats.get('cancelled_tasks', 0)}")
print(f" Pending: {stats.get('pending_tasks', 0)}")
print(f" Active background tasks: {stats.get('active_background_tasks', 0)}")
async def interactive_demo(server_path: Optional[str] = None):
"""Interactive demo to explore the system."""
print("🚀 Interactive Demo of the MCP Durability System")
print("=" * 55)
client_manager = DurabilityClient([])
async with await client_manager.connect() as client:
print("🔗 Connected to the durability server")
while True:
print(" " + "=" * 40)
print("Available options:")
print("1. Start data migration")
print("2. Start batch processing")
print("3. Start ML training")
print("4. View server statistics")
print("5. View task list")
print("6. Monitor specific task")
print("7. Cancel task")
print("8. Exit")
try:
choice = input(" Select an option (1-8): ").strip()
if choice == "1":
records = int(input("Number of records to migrate (default 500): ") or "500")
uri = await client_manager.start_data_migration(client, record_count=records)
print(f"Task URI: {uri}")
elif choice == "2":
total = int(input("Total elements (default 300): ") or "300")
batch_size = int(input("Batch size (default 30): ") or "30")
uri = await client_manager.start_batch_processing(client, total_items=total, batch_size=batch_size)
print(f"Task URI: {uri}")
elif choice == "3":
model_name = input("Model name (default 'demo-model'): ") or "demo-model"
epochs = int(input("Number of epochs (default 25): ") or "25")
uri = await client_manager.start_ml_training(client, model_name=model_name, epochs=epochs)
print(f"Task URI: {uri}")
elif choice == "4":
await client_manager.print_server_stats(client)
elif choice == "5":
await client_manager.print_task_list(client)
elif choice == "6":
task_id = await client_manager.select_task(client)
if task_id:
uri = f"task://status/{task_id}"
await client_manager.poll_task_until_complete(client, uri)
elif choice == "7":
task_id = await client_manager.select_task(client)
if task_id:
await client_manager.cancel_task(client, task_id)
elif choice == "8":
print("👋 Bye!")
break
else:
print("❌ Invalid option. Please select 1-8.")
except KeyboardInterrupt:
print(" 👋 Demo interrupted. Bye!")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
if __name__ == "__main__":
try:
asyncio.run(interactive_demo())
except KeyboardInterrupt:
print(" 👋 Bye!")
except Exception as e:
print(f"❌ Error: {str(e)}")
Copy
	
Writing MCP_durability_client/client.py

Executionlink image 19

First, we start the server.

	
!cd MCP_durability_server && source .venv/bin/activate && uv run server.py
Copy
	
Durability MCP server started
Available tools:
- start_data_migration: Starts data migration
- start_batch_processing: Starts batch processing
- start_ml_training: Simulates ML training
- cancel_task: Cancels a task
- get_server_stats: Gets server statistics
Resources available:
- task://status/{task_id}: Specific task status
- task://list: Lists all tasks
- task://list/{status}: Lists tasks by status
╭─ FastMCP 2.0 ──────────────────────────────────────────────────────────────╮
│ │
│ _ __ ___ ______ __ __ _____________ ____ ____ │
│ _ __ ___ / ____/___ ______/ /_/ |/ / ____/ __ |___ / __ │
│ _ __ ___ / /_ / __ `/ ___/ __/ /|_/ / / / /_/ / ___/ / / / / / │
│ _ __ ___ / __/ / /_/ (__ ) /_/ / / / /___/ ____/ / __/_/ /_/ / │
│ _ __ ___ /_/ __,_/____/__/_/ /_/____/_/ /_____(_)____/ │
│ │
│ │
│ │
│ 🖥️ Server name: MCP Durability Server │
│ 📦 Transport: Streamable-HTTP │
│ 🔗 Server URL: http://127.0.0.1:8080/mcp │
│ │
│ 📚 Docs: https://gofastmcp.com │
│ 🚀 Deploy: https://fastmcp.cloud │
│ │
│ 🏎️ FastMCP version: 2.11.3 │
│ 🤝 MCP version: 1.13.1 │
│ │
╰────────────────────────────────────────────────────────────────────────────╯
[08/28/25 11:46:08] INFO Starting MCP server 'MCP Durability ]8;id=396160;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py\server.py]8;;\:]8;id=646262;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py#1522\1522]8;;\
Server' with transport 'http' on
http://127.0.0.1:8080/mcp
INFO: Started server process [55234]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)

Now we run the client

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

First, we select option 5 to see that there are no tasks running.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
No tasks
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

We see that it outputs

Task list:
No tasks

There are no tasks running

Now we select option 1 to start the data migration task and select 100,000 samples to ensure that the task lasts long enough to do everything we need to do in the post.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 1
Number of records to migrate (default 500): 100000
🚀 Starting data migration: 100000 records
Source: /data/source
Destination: /data/destination
[08/28/25 12:05:22] INFO Server log: Starting migration of logging.py:40
100000 records from /data/source to
/data/destination
INFO Server log: Data migration started. logging.py:40
Track progress at:
task://status/821a210a-8672-4eba-b27c
-500bf63e58c1
✅ Task started. Resource URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Task URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1

We select option 5 to see the list of tasks.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 1.0% | Migrated 1000/100000 records

Now the task we just requested from the MCP server appears. We can see its ID and status. It carries 1,000 pieces of data out of 100,000.

We select option 8 to terminate the client.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 8
👋 Bye!

We run the client again.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

We select option 5 again to see the list of tasks running on the server.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 3.4% | Migrated 3400/100000 records

We see that the task has continued to run and that it has processed more data. Before, it had processed 1,000, and now it has processed 3,400.

Now we select option 6 to monitor the task we have requested from the server. This will allow us to monitor it for 10 seconds.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 6
📋 Available tasks:
1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 4.2% | Migrated 4200/100000 records
Select a task (1-1) or press Enter to cancel: 1
✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1
📊 Monitoring task 821a210a-8672-4eba-b27c-500bf63e58c1...
[░░░░░░░░░░░░░░░░░░░░] 4.9% | RUNNING | Migrated 4900/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.0% | RUNNING | Migrated 5000/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.1% | RUNNING | Migrated 5100/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.2% | RUNNING | Migrated 5200/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.3% | RUNNING | Migrated 5300/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.4% | RUNNING | Migrated 5400/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.5% | RUNNING | Migrated 5500/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.6% | RUNNING | Migrated 5600/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.7% | RUNNING | Migrated 5700/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.8% | RUNNING | Migrated 5800/100000 records
Timeout for task 821a210a-8672-4eba-b27c-500bf63e58c1

Finally, we cancel the task using option 7.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 7
📋 Available tasks:
1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 6.1% | Migrated 6100/100000 records
Select a task (1-1) or press Enter to cancel: 1
✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1
[08/28/25 12:06:25] INFO Server log: Task logging.py:40
821a210a-8672-4eba-b27c-500bf63e58c1
cancelled successfully
🚫 Task 821a210a-8672-4eba-b27c-500bf63e58c1 cancelled. Status available at: task://status/821a210a-8672-4eba-b27c-500bf63e58c1

Now we select option 5 again to see the list of tasks.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | CANCELLED | 6.3% | Migrated 6300/100000 records

We see that the task we requested from the server appears as canceled (CANCELLED).

Continue reading

Stream Information in MCP: Complete Guide to Real-time Progress Updates with FastMCP

Stream Information in MCP: Complete Guide to Real-time Progress Updates with FastMCP

Learn how to implement real-time streaming in MCP (Model Context Protocol) applications using FastMCP. This comprehensive guide shows you how to create MCP servers and clients that support progress updates and streaming information for long-running tasks. You'll build streaming-enabled tools that provide real-time feedback during data processing, file uploads, monitoring tasks, and other time-intensive operations. Discover how to use StreamableHttpTransport, implement progress handlers with Context, and create visual progress bars that enhance user experience when working with MCP applications that require continuous feedback.

Last posts -->

Have you seen these projects?

Horeca chatbot

Horeca chatbot Horeca chatbot
Python
LangChain
PostgreSQL
PGVector
React
Kubernetes
Docker
GitHub Actions

Chatbot conversational for cooks of hotels and restaurants. A cook, kitchen manager or room service of a hotel or restaurant can talk to the chatbot to get information about recipes and menus. But it also implements agents, with which it can edit or create new recipes or menus

Subtify

Subtify Subtify
Python
Whisper
Spaces

Subtitle generator for videos in the language you want. Also, it puts a different color subtitle to each person

View all projects -->

Do you want to apply AI in your project? Contact me!

Do you want to improve with these tips?

Last tips -->

Use this locally

Hugging Face spaces allow us to run models with very simple demos, but what if the demo breaks? Or if the user deletes it? That's why I've created docker containers with some interesting spaces, to be able to use them locally, whatever happens. In fact, if you click on any project view button, it may take you to a space that doesn't work.

Flow edit

Flow edit Flow edit

FLUX.1-RealismLora

FLUX.1-RealismLora FLUX.1-RealismLora
View all containers -->

Do you want to apply AI in your project? Contact me!

Do you want to train your model with these datasets?

short-jokes-dataset

Dataset with jokes in English

opus100

Dataset with translations from English to Spanish

netflix_titles

Dataset with Netflix movies and series

View more datasets -->