In this post, we will see how to create an MCP server and client with the ability for the client to request a long-running task from the server, stop the client, continue running the task on the server, reconnect the client, and view the status of the task.
MCP server with durability
The server will be similar to the one we made in the posts streamable MCP and resumable MCP, with the difference that now we are going to add the ability for the client to request a long-running task from the server, stop the client, the task continues to run on the server, reconnect the client, and view the status of the task.
Furthermore, in this case, we are going to save the tasks in an SQLite database to make it more professional, since in previous posts we saved the information in json files.
Implementation of the MCP server with durability
The server will have a DurableTaskManager
class that will manage the creation of tasks, save them in the database, report their status, and be able to cancel them.
It will have three resources: to obtain the status of a task, to list all tasks, and to list tasks by status.
It will have five tools, three of which will be specific tasks (data migration, data processing, and model training), one for canceling a task, and the last one for obtaining server statistics.
Create virtual server environment
First, we create the folder where we are going to develop it.
!mkdir MCP_durability_server
We create the environment with uv
!cd MCP_durability_server && uv init .
Initialized project `mcp-durability-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_server`
We started it
!cd MCP_durability_server && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
We install the necessary libraries.
!cd MCP_durability_server && uv add fastmcp
Resolved 64 packages in 344msInstalled 61 packages in 136ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.3+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.23.0+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.3.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.1+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.15.0+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Server code
%%writefile MCP_durability_server/server.py"""MCP Durability ServerMPC server that implements durability for long-running agents using Resource links.Allows long-running operations to survive server restarts and provides state trackingoutside of band.Implemented pattern:1. Tools return resource links immediately2. Background processing continues asynchronously3. Clients can poll or subscribe to resources for state updatesUsage:python server.py"""import asyncioimport jsonimport sqlite3import timeimport uuidfrom dataclasses import dataclassfrom enum import Enumfrom typing import Any, Dict, List, Optionalfrom fastmcp import FastMCPfrom fastmcp.server.context import Contextclass TaskStatus(Enum):"""Long running task status."""PENDING = "pending"RUNNING = "running"COMPLETED = "completed"FAILED = "failed"CANCELLED = "cancelled"@dataclassclass TaskResult:"""Result data for a completed task."""task_id: strstatus: TaskStatusprogress: float = 0.0total: Optional[float] = Nonemessage: Optional[str] = Noneresult_data: Optional[Dict[str, Any]] = Noneerror: Optional[str] = Nonecreated_at: float = 0.0updated_at: float = 0.0completed_at: Optional[float] = Noneclass DurableTaskManager:"""Manages persistent task state and background execution."""def __init__(self, db_path: str = "mcp_tasks.db"):self.db_path = db_pathself._background_tasks: Dict[str, asyncio.Task] = {}self._setup_database()def _setup_database(self) -> None:"""Initializes SQLite database for task persistence."""conn = sqlite3.connect(self.db_path)conn.execute("""CREATE TABLE IF NOT EXISTS tasks (task_id TEXT PRIMARY KEY,status TEXT NOT NULL,progress REAL DEFAULT 0.0,total REAL,message TEXT,result_data TEXT,error TEXT,created_at REAL NOT NULL,updated_at REAL NOT NULL,completed_at REAL)""")conn.commit()conn.close()async def create_task(self, task_id: Optional[str] = None) -> str:"""Creates a new task and returns its ID."""if not task_id:task_id = str(uuid.uuid4())current_time = time.time()task_result = TaskResult(task_id=task_id,status=TaskStatus.PENDING,created_at=current_time,updated_at=current_time)await self._save_task(task_result)return task_idasync def start_background_task(self,task_id: str,task_function,context: Optional[Context] = None) -> None:"""Starts a background task and tracks its execution."""async def wrapper():try:await self._update_task_status(task_id, TaskStatus.RUNNING)# Execute the real taskif context:result = await task_function(task_id, context, self)else:result = await task_function(task_id, self)# Mark as completed with resultsawait self._update_task_completion(task_id, result)except asyncio.CancelledError:await self._update_task_status(task_id, TaskStatus.CANCELLED)raiseexcept Exception as e:await self._update_task_error(task_id, str(e))finally:# Clean up background task referenceself._background_tasks.pop(task_id, None)# Start the task in the backgroundtask = asyncio.create_task(wrapper())self._background_tasks[task_id] = taskasync def _save_task(self, task_result: TaskResult) -> None:"""Saves the task result to the database."""conn = sqlite3.connect(self.db_path)conn.execute("""INSERT OR REPLACE INTO tasks(task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_at)VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (task_result.task_id,task_result.status.value,task_result.progress,task_result.total,task_result.message,json.dumps(task_result.result_data) if task_result.result_data else None,task_result.error,task_result.created_at,task_result.updated_at,task_result.completed_at))conn.commit()conn.close()async def get_task(self, task_id: str) -> Optional[TaskResult]:"""Retrieves the task result from the database."""conn = sqlite3.connect(self.db_path)cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks WHERE task_id = ?""", (task_id,))row = cursor.fetchone()conn.close()if not row:return Noneresult_data = Noneif row[5]: # result_datatry:result_data = json.loads(row[5])except json.JSONDecodeError:passreturn TaskResult(task_id=row[0],status=TaskStatus(row[1]),progress=row[2],total=row[3],message=row[4],result_data=result_data,error=row[6],created_at=row[7],updated_at=row[8],completed_at=row[9])async def update_task_progress(self,task_id: str,progress: float,total: Optional[float] = None,message: Optional[str] = None) -> None:"""Updates the task progress."""task_result = await self.get_task(task_id)if not task_result:returntask_result.progress = progressif total is not None:task_result.total = totalif message is not None:task_result.message = messagetask_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_status(self, task_id: str, status: TaskStatus) -> None:"""Updates the task status."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = statustask_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_completion(self, task_id: str, result_data: Any) -> None:"""Marks the task as completed with results."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = TaskStatus.COMPLETEDtask_result.progress = task_result.total or 100.0task_result.result_data = result_data if isinstance(result_data, dict) else {"result": result_data}task_result.completed_at = time.time()task_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_error(self, task_id: str, error: str) -> None:"""Marks the task as failed with error."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = TaskStatus.FAILEDtask_result.error = errortask_result.updated_at = time.time()await self._save_task(task_result)async def cancel_task(self, task_id: str) -> bool:"""Cancels a background task in execution."""if task_id in self._background_tasks:task = self._background_tasks[task_id]task.cancel()await self._update_task_status(task_id, TaskStatus.CANCELLED)return Truereturn Falseasync def list_tasks(self, status_filter: Optional[TaskStatus] = None) -> List[TaskResult]:"""Lists all tasks, optionally filtered by status."""conn = sqlite3.connect(self.db_path)if status_filter:cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks WHERE status = ? ORDER BY updated_at DESC""", (status_filter.value,))else:cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks ORDER BY updated_at DESC""")rows = cursor.fetchall()conn.close()tasks = []for row in rows:result_data = Noneif row[5]:try:result_data = json.loads(row[5])except json.JSONDecodeError:passtasks.append(TaskResult(task_id=row[0],status=TaskStatus(row[1]),progress=row[2],total=row[3],message=row[4],result_data=result_data,error=row[6],created_at=row[7],updated_at=row[8],completed_at=row[9]))return tasks# Create the MCP server with durability capabilitiesserver = FastMCP(name="MCP Durability Server",instructions="An MCP server that demonstrates durability for long-running agents")# Initialize the task managertask_manager = DurableTaskManager()# Resources to track the state of tasks@server.resource("task://status/{task_id}")async def get_task_status(task_id: str) -> str:"""Gets the current status of a task by ID."""task_result = await task_manager.get_task(task_id)if not task_result:return json.dumps({"error": "Task not found", "task_id": task_id})data = {"task_id": task_result.task_id,"status": task_result.status.value,"progress": task_result.progress,"total": task_result.total,"message": task_result.message,"result_data": task_result.result_data,"error": task_result.error,"created_at": task_result.created_at,"updated_at": task_result.updated_at,"completed_at": task_result.completed_at,}return json.dumps(data)@server.resource("task://list")async def list_all_tasks() -> str:"""Lists all tasks with their current status."""tasks = await task_manager.list_tasks()data = {"tasks": [{"task_id": task.task_id,"status": task.status.value,"progress": task.progress,"total": task.total,"message": task.message,"created_at": task.created_at,"updated_at": task.updated_at,"completed_at": task.completed_at,"has_result": task.result_data is not None,"has_error": task.error is not None} for task in tasks],"total": len(tasks)}return json.dumps(data)@server.resource("task://list/{status}")async def list_tasks_by_status(status: str) -> str:"""Lists tasks filtered by status."""try:status_enum = TaskStatus(status.lower())tasks = await task_manager.list_tasks(status_enum)data = {"tasks": [{"task_id": task.task_id,"status": task.status.value,"progress": task.progress,"total": task.total,"message": task.message,"created_at": task.created_at,"updated_at": task.updated_at,"completed_at": task.completed_at} for task in tasks],"status_filter": status,"total": len(tasks)}return json.dumps(data)except ValueError:data = {"error": f"Invalid status: {status}","valid_statuses": [s.value for s in TaskStatus]}return json.dumps(data)# Tools to demonstrate durability functionality@server.toolasync def start_data_migration(source_path: str,destination_path: str,ctx: Context,record_count: int = 1000) -> str:"""Starts a long-running data migration task.Args:source_path: Source path of the datadestination_path: Destination path of the datarecord_count: Number of records to migratectx: Contexto MCPReturns:Resource URI to track progress"""await ctx.info(f"Starting migration of {record_count} records from {source_path} to {destination_path}")print(f"Starting migration of {record_count} records from {source_path} to {destination_path}")async def migration_task(task_id: str, context: Context, manager: DurableTaskManager):"""Simulated data migration task."""batch_size = 100migrated = 0for batch_start in range(0, record_count, batch_size):# Simulate batch processingawait asyncio.sleep(1) # Simulate workbatch_end = min(batch_start + batch_size, record_count)migrated = batch_end# Update progressprogress_pct = (migrated / record_count) * 100message = f"Migrated {migrated}/{record_count} records"await manager.update_task_progress(task_id, progress_pct, 100.0, message)try:await context.report_progress(migrated, record_count, message)except Exception:passtry:await context.info(f"Data migration completed: {migrated} records")except Exception:passreturn {"migrated_records": migrated,"total_records": record_count,"source_path": source_path,"destination_path": destination_path,"success": True,"completion_time": time.time()}# Create the task and start background processingtask_id = await task_manager.create_task()await task_manager.start_background_task(task_id, migration_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Data migration started. Track progress at: {resource_link}")return resource_link@server.toolasync def start_batch_processing(batch_size: int,total_items: int,ctx: Context,processing_delay: float = 10) -> str:"""Starts a batch processing task.Args:batch_size: Size of each batchtotal_items: Total number of items to processprocessing_delay: Delay per batch in secondsctx: MCP contextReturns:Resource URI to track progress"""await ctx.info(f"Starting batch processing: {total_items} items in batches of {batch_size}")async def batch_task(task_id: str, context: Context, manager: DurableTaskManager):"""Batch processing task."""processed = 0for i in range(0, total_items, batch_size):await asyncio.sleep(processing_delay) # Simulate workbatch_end = min(i + batch_size, total_items)processed = batch_endprogress = (processed / total_items) * 100message = f"Processed {processed}/{total_items} items"await manager.update_task_progress(task_id, progress, 100.0, message)try:await context.report_progress(processed, total_items, message)except Exception:passtry:await context.info(f"Batch processing completed: {processed} items")except Exception:passreturn {"processed": processed,"total": total_items,"batch_size": batch_size,"processing_delay": processing_delay,"success": True}task_id = await task_manager.create_task()await task_manager.start_background_task(task_id, batch_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Batch processing started. Track progress at: {resource_link}")return resource_link@server.toolasync def start_ml_training(model_name: str,ctx: Context,dataset_size: int = 10000,epochs: int = 100) -> str:"""Simulates machine learning model training.Args:model_name: Name of the model to traindataset_size: Size of the datasetepochs: Number of training epochsctx: MCP contextReturns:Resource URI to track progress"""await ctx.info(f"Starting training of model '{model_name}' with {dataset_size} samples by {epochs} epochs")async def training_task(task_id: str, context: Context, manager: DurableTaskManager):"""Simulated ML training task."""for epoch in range(1, epochs + 1):# Simulate training of an epochawait asyncio.sleep(10)# Simulate training metricsloss = 1.0 - (epoch / epochs) * 0.8 + (epoch % 10) * 0.01accuracy = (epoch / epochs) * 0.95 + (epoch % 5) * 0.002progress = (epoch / epochs) * 100message = f"Epoch {epoch}/{epochs} - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}"await manager.update_task_progress(task_id, progress, 100.0, message)try:await context.report_progress(epoch, epochs, message)except Exception:passtry:await context.info(f"Model '{model_name}' training completed")except Exception:passreturn {"model_name": model_name,"dataset_size": dataset_size,"epochs_completed": epochs,"final_loss": loss,"final_accuracy": accuracy,"success": True,"training_time_seconds": epochs * 0.5}task_id = await task_manager.create_task()await task_manager.start_background_task(task_id, training_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Training started. Track progress at: {resource_link}")return resource_link@server.toolasync def cancel_task(task_id: str, ctx: Context) -> str:"""Cancels an in-progress task.Args:task_id: ID of the task to cancelctx: MCP contextReturns:Status message of the cancellation"""success = await task_manager.cancel_task(task_id)if success:await ctx.info(f"Task {task_id} cancelled successfully")return f"Task {task_id} cancelled. Status available at: task://status/{task_id}"else:await ctx.warning(f"Task {task_id} not found or cannot be cancelled")return f"Task {task_id} not found or cannot be cancelled"@server.toolasync def get_server_stats(ctx: Context) -> Dict[str, Any]:"""Gets durability server statistics.Args:ctx: MCP contextReturns:Server statistics"""all_tasks = await task_manager.list_tasks()stats = {"total_tasks": len(all_tasks),"running_tasks": len([t for t in all_tasks if t.status == TaskStatus.RUNNING]),"completed_tasks": len([t for t in all_tasks if t.status == TaskStatus.COMPLETED]),"failed_tasks": len([t for t in all_tasks if t.status == TaskStatus.FAILED]),"cancelled_tasks": len([t for t in all_tasks if t.status == TaskStatus.CANCELLED]),"pending_tasks": len([t for t in all_tasks if t.status == TaskStatus.PENDING]),"active_background_tasks": len(task_manager._background_tasks),}await ctx.info(f"Server statistics: {stats['total_tasks']} total tasks, {stats['running_tasks']} running")return statsif __name__ == "__main__":print("Durability MCP server started")print("Available tools:")print(" - start_data_migration: Starts data migration")print(" - start_batch_processing: Starts batch processing")print(" - start_ml_training: Simulates ML training")print(" - cancel_task: Cancels a task")print(" - get_server_stats: Gets server statistics")print(" Resources available:")print(" - task://status/{task_id}: Specific task status")print(" - task://list: Lists all tasks")print(" - task://list/{status}: Lists tasks by status")server.run(transport="http", host="127.0.0.1", port=8080)
Writing MCP_durability_server/server.py
MCP client with durability
The client will be very similar to the one we made in the posts streamable MCP and resumable MCP.
Implementation of the MCP client with durability
There will be a DurabilityClient
class that will have methods to connect to the server, start the three possible tasks (data migration, data processing, and model training), obtain the status of a task, obtain a list of all tasks, or cancel a task.
Create virtual customer environment
First, we create the folder where we are going to develop it.
!mkdir MCP_durability_client
We create the environment with uv
!cd MCP_durability_client && uv init .
Initialized project `mcp-durability-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_client`
We started it
!cd MCP_durability_client && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
We install the necessary libraries.
!cd MCP_durability_client && uv add fastmcp
Resolved 64 packages in 17msInstalled 61 packages in 80ms2+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.3+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.23.0+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.3.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.1+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.15.0+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Customer code
%%writefile MCP_durability_client/client.py"""Durability MCP clientClient that demonstrates how to interact with an MCP server that implements durability.Shows how to:1. Start long-running tasks2. Monitor progress using resource polling3. Subscribe to status updates (simulated)4. Handle persistent tasks that survive server restartsUsage:python client.py"""import asyncioimport jsonimport timefrom typing import Any, Dict, List, Optionalfrom fastmcp import Clientfrom fastmcp.client.transports import StreamableHttpTransportimport argparseclass DurabilityClient:"""Client that demonstrates durability patterns with MCP."""def __init__(self, server_command: List[str]):"""Initializes the durability client.Args:server_command: Command to execute the MCP server"""self.server_command = server_commandself._polling_tasks: Dict[str, asyncio.Task] = {}async def connect(self) -> Client:"""Creates and connects the client to the server."""transport = StreamableHttpTransport(url="http://127.0.0.1:8080/mcp")client = Client(transport)return clientasync def start_data_migration(self,client: Client,source_path: str = "/data/source",destination_path: str = "/data/destination",record_count: int = 1000) -> str:"""Starts a data migration task and returns the resource link.Args:client: MCP connected clientsource_path: Source pathdestination_path: Destination pathrecord_count: Number of records to migrateReturns:Resource URI to track progress"""print(f"🚀 Starting data migration: {record_count} records")print(f" Source: {source_path}")print(f" Destination: {destination_path}")result = await client.call_tool("start_data_migration",{"source_path": source_path,"destination_path": destination_path,"record_count": record_count})resource_uri = result.content[0].textprint(f"✅ Task started. Resource URI: {resource_uri}")return resource_uriasync def start_batch_processing(self,client: Client,batch_size: int = 50,total_items: int = 500,processing_delay: float = 0.3) -> str:"""Starts batch processing.Args:client: MCP connected clientbatch_size: Batch sizetotal_items: Total itemsprocessing_delay: Processing delayReturns:Resource URI to track progress"""print(f"🔄 Starting batch processing:")print(f" Total items: {total_items}")print(f" Batch size: {batch_size}")print(f" Processing delay: {processing_delay}s")result = await client.call_tool("start_batch_processing",{"batch_size": batch_size,"total_items": total_items,"processing_delay": processing_delay})resource_uri = result.content[0].textprint(f"✅ Batch processing started. Resource URI: {resource_uri}")return resource_uriasync def start_ml_training(self,client: Client,model_name: str = "clasificador-texto",dataset_size: int = 5000,epochs: int = 50) -> str:"""Starts ML model training.Args:client: MCP connected clientmodel_name: Model namedataset_size: Dataset sizeepochs: Number of epochsReturns:Resource URI to track progress"""print(f"🤖 Starting ML training:")print(f" Model: {model_name}")print(f" Dataset size: {dataset_size} samples")print(f" Epochs: {epochs}")result = await client.call_tool("start_ml_training",{"model_name": model_name,"dataset_size": dataset_size,"epochs": epochs})resource_uri = result.content[0].textprint(f"✅ ML training started. Resource URI: {resource_uri}")return resource_uriasync def get_task_status(self, client: Client, resource_uri: str) -> Dict[str, Any]:"""Gets the current status of a task.Args:client: MCP connected clientresource_uri: Resource URI of the taskReturns:Current status of the task"""try:result = await client.read_resource(resource_uri)# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}, text={getattr(content_block, 'text', None)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading resource: {str(e)}"}async def poll_task_until_complete(self,client: Client,resource_uri: str,poll_interval: float = 1.0,max_duration: float = 10.0) -> Dict[str, Any]:"""Polls a task until it completes or fails.Args:client: MCP connected clientresource_uri: Resource URI of the taskpoll_interval: Interval between polls in secondsmax_duration: Maximum duration of pollingReturns:Final status of the task"""task_id = resource_uri.split("/")[-1]print(f"📊 Monitoring task {task_id}...")start_time = time.time()last_progress = -1while time.time() - start_time < max_duration:status = await self.get_task_status(client, resource_uri)if "error" in status and status["error"] is not None:print(f"Error getting status: {status['error']}")return statuscurrent_status = status.get("status", "unknown")progress = status.get("progress", 0)message = status.get("message", "")# Show updates only if the progress changedif progress != last_progress:progress_bar = self._create_progress_bar(progress)print(f" {progress_bar} {progress:5.1f}% | {current_status.upper()} | {message}")last_progress = progress# Check if the task is completeif current_status in ["completed", "failed", "cancelled"]:print(f"Task {current_status}: {task_id}")if current_status == "completed" and status.get("result_data"):print("Results:")self._print_results(status["result_data"])elif current_status == "failed" and status.get("error"):print(f"Error: {status['error']}")return statusawait asyncio.sleep(poll_interval)print(f"Timeout for task {task_id}")return await self.get_task_status(client, resource_uri)def _create_progress_bar(self, progress: float, width: int = 20) -> str:"""Creates a visual progress bar."""filled = int(width * progress / 100)bar = "█" * filled + "░" * (width - filled)return f"[{bar}]"def _print_results(self, result_data: Dict[str, Any], indent: int = 4) -> None:"""Prints the results in a formatted way."""spaces = " " * indentfor key, value in result_data.items():if isinstance(value, dict):print(f"{spaces}{key}:")self._print_results(value, indent + 2)elif isinstance(value, list):print(f"{spaces}{key}: {len(value)} items")else:print(f"{spaces}{key}: {value}")async def list_all_tasks(self, client: Client) -> Dict[str, Any]:"""Lists all tasks on the server."""try:result = await client.read_resource("task://list")# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading task list: {str(e)}"}async def list_tasks_by_status(self, client: Client, status: str) -> Dict[str, Any]:"""Lists tasks filtered by status."""try:result = await client.read_resource(f"task://list/{status}")# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading task list: {str(e)}"}async def print_task_list(self, client: Client) -> None:"""Prints a formatted list of all tasks."""print("Task list:")task_list = await self.list_all_tasks(client)if "error" in task_list:print(f" {task_list['error']}")returntasks = task_list.get("tasks", [])if not tasks:print(" No tasks")returnfor task in tasks:task_id = task["task_id"]status = task["status"].upper()progress = task.get("progress", 0)message = task.get("message", "")print(f" ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")async def select_task(self, client: Client) -> Optional[str]:"""Shows a numbered list of tasks and allows the user to select one.Args:client: MCP connected clientReturns:The ID of the selected task, or None if no task was selected"""print("📋 Available tasks:")task_list = await self.list_all_tasks(client)if "error" in task_list:print(f" ❌ {task_list['error']}")return Nonetasks = task_list.get("tasks", [])if not tasks:print(" No tasks available")return None# Show tasks with numbersfor i, task in enumerate(tasks, 1):task_id = task["task_id"]status = task["status"].upper()progress = task.get("progress", 0)message = task.get("message", "")print(f" {i}. ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")# Ask user to selecttry:choice = input(f" Select a task (1-{len(tasks)}) or press Enter to cancel: ").strip()if not choice:print("Selection cancelled")return Nonechoice_num = int(choice)if 1 <= choice_num <= len(tasks):selected_task = tasks[choice_num - 1]selected_id = selected_task["task_id"]print(f"✅ Selected task: {selected_id}")return selected_idelse:print(f"❌ Invalid selection. Please choose between 1 and {len(tasks)}")return Noneexcept ValueError:print("❌ Invalid input. Please enter a number")return Noneexcept Exception as e:print(f"❌ Error selecting task: {str(e)}")return Noneasync def cancel_task(self, client: Client, task_id: str) -> str:"""Cancels a specific task."""try:result = await client.call_tool("cancel_task", {"task_id": task_id})response = result.content[0].textprint(f"🚫 {response}")return responseexcept Exception as e:error_msg = f"Error canceling task: {str(e)}"print(f"❌ {error_msg}")return error_msgasync def get_server_stats(self, client: Client) -> Dict[str, Any]:"""Gets server statistics."""try:result = await client.call_tool("get_server_stats", {})# result.content is a list of ContentBlockif hasattr(result, 'content') and len(result.content) > 0:content = result.content[0]if hasattr(content, 'text'):return json.loads(content.text)else:# If not text, it may be the object directlyreturn content if isinstance(content, dict) else {"error": f"Unexpected format: {type(content)}"}else:return {"error": "Empty response from get_server_stats"}except Exception as e:return {"error": f"Error getting server statistics: {str(e)}"}async def print_server_stats(self, client: Client) -> None:"""Prints server statistics."""print("📊 Server statistics:")stats = await self.get_server_stats(client)if "error" in stats:print(f" ❌ {stats['error']}")returnprint(f" Total tasks: {stats.get('total_tasks', 0)}")print(f" Running: {stats.get('running_tasks', 0)}")print(f" Completed: {stats.get('completed_tasks', 0)}")print(f" Failed: {stats.get('failed_tasks', 0)}")print(f" Cancelled: {stats.get('cancelled_tasks', 0)}")print(f" Pending: {stats.get('pending_tasks', 0)}")print(f" Active background tasks: {stats.get('active_background_tasks', 0)}")async def interactive_demo(server_path: Optional[str] = None):"""Interactive demo to explore the system."""print("🚀 Interactive Demo of the MCP Durability System")print("=" * 55)client_manager = DurabilityClient([])async with await client_manager.connect() as client:print("🔗 Connected to the durability server")while True:print(" " + "=" * 40)print("Available options:")print("1. Start data migration")print("2. Start batch processing")print("3. Start ML training")print("4. View server statistics")print("5. View task list")print("6. Monitor specific task")print("7. Cancel task")print("8. Exit")try:choice = input(" Select an option (1-8): ").strip()if choice == "1":records = int(input("Number of records to migrate (default 500): ") or "500")uri = await client_manager.start_data_migration(client, record_count=records)print(f"Task URI: {uri}")elif choice == "2":total = int(input("Total elements (default 300): ") or "300")batch_size = int(input("Batch size (default 30): ") or "30")uri = await client_manager.start_batch_processing(client, total_items=total, batch_size=batch_size)print(f"Task URI: {uri}")elif choice == "3":model_name = input("Model name (default 'demo-model'): ") or "demo-model"epochs = int(input("Number of epochs (default 25): ") or "25")uri = await client_manager.start_ml_training(client, model_name=model_name, epochs=epochs)print(f"Task URI: {uri}")elif choice == "4":await client_manager.print_server_stats(client)elif choice == "5":await client_manager.print_task_list(client)elif choice == "6":task_id = await client_manager.select_task(client)if task_id:uri = f"task://status/{task_id}"await client_manager.poll_task_until_complete(client, uri)elif choice == "7":task_id = await client_manager.select_task(client)if task_id:await client_manager.cancel_task(client, task_id)elif choice == "8":print("👋 Bye!")breakelse:print("❌ Invalid option. Please select 1-8.")except KeyboardInterrupt:print(" 👋 Demo interrupted. Bye!")breakexcept Exception as e:print(f"❌ Error: {str(e)}")if __name__ == "__main__":try:asyncio.run(interactive_demo())except KeyboardInterrupt:print(" 👋 Bye!")except Exception as e:print(f"❌ Error: {str(e)}")
Writing MCP_durability_client/client.py
Execution
First, we start the server.
!cd MCP_durability_server && source .venv/bin/activate && uv run server.py
Durability MCP server startedAvailable tools:- start_data_migration: Starts data migration- start_batch_processing: Starts batch processing- start_ml_training: Simulates ML training- cancel_task: Cancels a task- get_server_stats: Gets server statisticsResources available:- task://status/{task_id}: Specific task status- task://list: Lists all tasks- task://list/{status}: Lists tasks by status╭─ FastMCP 2.0 ──────────────────────────────────────────────────────────────╮│ ││ _ __ ___ ______ __ __ _____________ ____ ____ ││ _ __ ___ / ____/___ ______/ /_/ |/ / ____/ __ |___ / __ ││ _ __ ___ / /_ / __ `/ ___/ __/ /|_/ / / / /_/ / ___/ / / / / / ││ _ __ ___ / __/ / /_/ (__ ) /_/ / / / /___/ ____/ / __/_/ /_/ / ││ _ __ ___ /_/ __,_/____/__/_/ /_/____/_/ /_____(_)____/ ││ ││ ││ ││ 🖥️ Server name: MCP Durability Server ││ 📦 Transport: Streamable-HTTP ││ 🔗 Server URL: http://127.0.0.1:8080/mcp ││ ││ 📚 Docs: https://gofastmcp.com ││ 🚀 Deploy: https://fastmcp.cloud ││ ││ 🏎️ FastMCP version: 2.11.3 ││ 🤝 MCP version: 1.13.1 ││ │╰────────────────────────────────────────────────────────────────────────────╯[08/28/25 11:46:08] INFO Starting MCP server 'MCP Durability ]8;id=396160;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py\server.py]8;;\:]8;id=646262;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py#1522\1522]8;;\Server' with transport 'http' onhttp://127.0.0.1:8080/mcpINFO: Started server process [55234]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)
Now we run the client
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
First, we select option 5
to see that there are no tasks running.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:No tasks========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
We see that it outputs
Task list:
No tasks
There are no tasks running
Now we select option 1
to start the data migration task and select 100,000
samples to ensure that the task lasts long enough to do everything we need to do in the post.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 1Number of records to migrate (default 500): 100000🚀 Starting data migration: 100000 recordsSource: /data/sourceDestination: /data/destination[08/28/25 12:05:22] INFO Server log: Starting migration of logging.py:40100000 records from /data/source to/data/destinationINFO Server log: Data migration started. logging.py:40Track progress at:task://status/821a210a-8672-4eba-b27c-500bf63e58c1✅ Task started. Resource URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1Task URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
We select option 5
to see the list of tasks.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 1.0% | Migrated 1000/100000 records
Now the task we just requested from the MCP server appears. We can see its ID and status. It carries 1,000 pieces of data out of 100,000.
We select option 8
to terminate the client.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 8👋 Bye!
We run the client again.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
We select option 5
again to see the list of tasks running on the server.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 3.4% | Migrated 3400/100000 records
We see that the task has continued to run and that it has processed more data. Before, it had processed 1,000, and now it has processed 3,400.
Now we select option 6
to monitor the task we have requested from the server. This will allow us to monitor it for 10 seconds.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 6📋 Available tasks:1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 4.2% | Migrated 4200/100000 recordsSelect a task (1-1) or press Enter to cancel: 1✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1📊 Monitoring task 821a210a-8672-4eba-b27c-500bf63e58c1...[░░░░░░░░░░░░░░░░░░░░] 4.9% | RUNNING | Migrated 4900/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.0% | RUNNING | Migrated 5000/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.1% | RUNNING | Migrated 5100/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.2% | RUNNING | Migrated 5200/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.3% | RUNNING | Migrated 5300/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.4% | RUNNING | Migrated 5400/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.5% | RUNNING | Migrated 5500/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.6% | RUNNING | Migrated 5600/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.7% | RUNNING | Migrated 5700/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.8% | RUNNING | Migrated 5800/100000 recordsTimeout for task 821a210a-8672-4eba-b27c-500bf63e58c1
Finally, we cancel the task using option 7
.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 7📋 Available tasks:1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 6.1% | Migrated 6100/100000 recordsSelect a task (1-1) or press Enter to cancel: 1✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1[08/28/25 12:06:25] INFO Server log: Task logging.py:40821a210a-8672-4eba-b27c-500bf63e58c1cancelled successfully🚫 Task 821a210a-8672-4eba-b27c-500bf63e58c1 cancelled. Status available at: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Now we select option 5
again to see the list of tasks.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | CANCELLED | 6.3% | Migrated 6300/100000 records
We see that the task we requested from the server appears as canceled (CANCELLED
).