En este post vamos a ver cómo crear un servidor y un cliente MCP con la capacidad de que el cliente le pida una tarea de larga duración al servidor, parar el cliente, la tarea se sigue ejecutando en el servidor, volver a conectar el cliente y ver el estado de la tarea.
Servidor MCP con durabilidad
El servidor va a ser parecido al que hicimos en los posts streamable MCP y resumable MCP, con la diferencia de que ahora vamos a añadir la capacidad de que el cliente le pida una tarea de larga duración al servidor, parar el cliente, la tarea se sigue ejecutando en el servidor, volver a conectar el cliente y ver el estado de la tarea.
Además, en este caso, vamos a guardar las tareas en una base de datos SQLite para hacerlo más profesional, ya que en los anteriores post guardábamos la información en archivos json.
Implementación del servidor MCP con durabilidad
El servidor va a tener una clase DurableTaskManager
que va a gestionar la creación de las tareas, las va a guardar en la base de datos, va a reportar el estado y las va a poder cancelar.
Va a tener tres resources, para obtener el estado de una tarea, para listar todas las tareas y para listar las tareas por estado.
Y va a tener cinco tools, tres van a ser tareas en concreto (migración de datos, procesado de datos y entrenamiento de modelo), una para cancelar una tarea y la última para obtener las estadísticas del servidor.
Crear entorno virtual del servidor
Primero creamos la carpeta donde lo vamos a desarrollar
!mkdir MCP_durability_server
Creamos el entorno con uv
!cd MCP_durability_server && uv init .
Initialized project `mcp-durability-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_server`
Lo iniciamos
!cd MCP_durability_server && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos las librerías necesarias
!cd MCP_durability_server && uv add fastmcp
Resolved 64 packages in 344msInstalled 61 packages in 136ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.3+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.23.0+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.3.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.1+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.15.0+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código del servidor
%%writefile MCP_durability_server/server.py"""MCP Durability ServerMPC server that implements durability for long-running agents using Resource links.Allows long-running operations to survive server restarts and provides state trackingoutside of band.Implemented pattern:1. Tools return resource links immediately2. Background processing continues asynchronously3. Clients can poll or subscribe to resources for state updatesUsage:python server.py"""import asyncioimport jsonimport sqlite3import timeimport uuidfrom dataclasses import dataclassfrom enum import Enumfrom typing import Any, Dict, List, Optionalfrom fastmcp import FastMCPfrom fastmcp.server.context import Contextclass TaskStatus(Enum):"""Long running task status."""PENDING = "pending"RUNNING = "running"COMPLETED = "completed"FAILED = "failed"CANCELLED = "cancelled"@dataclassclass TaskResult:"""Result data for a completed task."""task_id: strstatus: TaskStatusprogress: float = 0.0total: Optional[float] = Nonemessage: Optional[str] = Noneresult_data: Optional[Dict[str, Any]] = Noneerror: Optional[str] = Nonecreated_at: float = 0.0updated_at: float = 0.0completed_at: Optional[float] = Noneclass DurableTaskManager:"""Manages persistent task state and background execution."""def __init__(self, db_path: str = "mcp_tasks.db"):self.db_path = db_pathself._background_tasks: Dict[str, asyncio.Task] = {}self._setup_database()def _setup_database(self) -> None:"""Initializes SQLite database for task persistence."""conn = sqlite3.connect(self.db_path)conn.execute("""CREATE TABLE IF NOT EXISTS tasks (task_id TEXT PRIMARY KEY,status TEXT NOT NULL,progress REAL DEFAULT 0.0,total REAL,message TEXT,result_data TEXT,error TEXT,created_at REAL NOT NULL,updated_at REAL NOT NULL,completed_at REAL)""")conn.commit()conn.close()async def create_task(self, task_id: Optional[str] = None) -> str:"""Creates a new task and returns its ID."""if not task_id:task_id = str(uuid.uuid4())current_time = time.time()task_result = TaskResult(task_id=task_id,status=TaskStatus.PENDING,created_at=current_time,updated_at=current_time)await self._save_task(task_result)return task_idasync def start_background_task(self,task_id: str,task_function,context: Optional[Context] = None) -> None:"""Starts a background task and tracks its execution."""async def wrapper():try:await self._update_task_status(task_id, TaskStatus.RUNNING)# Execute the real taskif context:result = await task_function(task_id, context, self)else:result = await task_function(task_id, self)# Mark as completed with resultsawait self._update_task_completion(task_id, result)except asyncio.CancelledError:await self._update_task_status(task_id, TaskStatus.CANCELLED)raiseexcept Exception as e:await self._update_task_error(task_id, str(e))finally:# Clean up background task referenceself._background_tasks.pop(task_id, None)# Start the task in the backgroundtask = asyncio.create_task(wrapper())self._background_tasks[task_id] = taskasync def _save_task(self, task_result: TaskResult) -> None:"""Saves the task result to the database."""conn = sqlite3.connect(self.db_path)conn.execute("""INSERT OR REPLACE INTO tasks(task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_at)VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (task_result.task_id,task_result.status.value,task_result.progress,task_result.total,task_result.message,json.dumps(task_result.result_data) if task_result.result_data else None,task_result.error,task_result.created_at,task_result.updated_at,task_result.completed_at))conn.commit()conn.close()async def get_task(self, task_id: str) -> Optional[TaskResult]:"""Retrieves the task result from the database."""conn = sqlite3.connect(self.db_path)cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks WHERE task_id = ?""", (task_id,))row = cursor.fetchone()conn.close()if not row:return Noneresult_data = Noneif row[5]: # result_datatry:result_data = json.loads(row[5])except json.JSONDecodeError:passreturn TaskResult(task_id=row[0],status=TaskStatus(row[1]),progress=row[2],total=row[3],message=row[4],result_data=result_data,error=row[6],created_at=row[7],updated_at=row[8],completed_at=row[9])async def update_task_progress(self,task_id: str,progress: float,total: Optional[float] = None,message: Optional[str] = None) -> None:"""Updates the task progress."""task_result = await self.get_task(task_id)if not task_result:returntask_result.progress = progressif total is not None:task_result.total = totalif message is not None:task_result.message = messagetask_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_status(self, task_id: str, status: TaskStatus) -> None:"""Updates the task status."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = statustask_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_completion(self, task_id: str, result_data: Any) -> None:"""Marks the task as completed with results."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = TaskStatus.COMPLETEDtask_result.progress = task_result.total or 100.0task_result.result_data = result_data if isinstance(result_data, dict) else {"result": result_data}task_result.completed_at = time.time()task_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_error(self, task_id: str, error: str) -> None:"""Marks the task as failed with error."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = TaskStatus.FAILEDtask_result.error = errortask_result.updated_at = time.time()await self._save_task(task_result)async def cancel_task(self, task_id: str) -> bool:"""Cancels a background task in execution."""if task_id in self._background_tasks:task = self._background_tasks[task_id]task.cancel()await self._update_task_status(task_id, TaskStatus.CANCELLED)return Truereturn Falseasync def list_tasks(self, status_filter: Optional[TaskStatus] = None) -> List[TaskResult]:"""Lists all tasks, optionally filtered by status."""conn = sqlite3.connect(self.db_path)if status_filter:cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks WHERE status = ? ORDER BY updated_at DESC""", (status_filter.value,))else:cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks ORDER BY updated_at DESC""")rows = cursor.fetchall()conn.close()tasks = []for row in rows:result_data = Noneif row[5]:try:result_data = json.loads(row[5])except json.JSONDecodeError:passtasks.append(TaskResult(task_id=row[0],status=TaskStatus(row[1]),progress=row[2],total=row[3],message=row[4],result_data=result_data,error=row[6],created_at=row[7],updated_at=row[8],completed_at=row[9]))return tasks# Create the MCP server with durability capabilitiesserver = FastMCP(name="MCP Durability Server",instructions="An MCP server that demonstrates durability for long-running agents")# Initialize the task managertask_manager = DurableTaskManager()# Resources to track the state of tasks@server.resource("task://status/{task_id}")async def get_task_status(task_id: str) -> str:"""Gets the current status of a task by ID."""task_result = await task_manager.get_task(task_id)if not task_result:return json.dumps({"error": "Task not found", "task_id": task_id})data = {"task_id": task_result.task_id,"status": task_result.status.value,"progress": task_result.progress,"total": task_result.total,"message": task_result.message,"result_data": task_result.result_data,"error": task_result.error,"created_at": task_result.created_at,"updated_at": task_result.updated_at,"completed_at": task_result.completed_at,}return json.dumps(data)@server.resource("task://list")async def list_all_tasks() -> str:"""Lists all tasks with their current status."""tasks = await task_manager.list_tasks()data = {"tasks": [{"task_id": task.task_id,"status": task.status.value,"progress": task.progress,"total": task.total,"message": task.message,"created_at": task.created_at,"updated_at": task.updated_at,"completed_at": task.completed_at,"has_result": task.result_data is not None,"has_error": task.error is not None} for task in tasks],"total": len(tasks)}return json.dumps(data)@server.resource("task://list/{status}")async def list_tasks_by_status(status: str) -> str:"""Lists tasks filtered by status."""try:status_enum = TaskStatus(status.lower())tasks = await task_manager.list_tasks(status_enum)data = {"tasks": [{"task_id": task.task_id,"status": task.status.value,"progress": task.progress,"total": task.total,"message": task.message,"created_at": task.created_at,"updated_at": task.updated_at,"completed_at": task.completed_at} for task in tasks],"status_filter": status,"total": len(tasks)}return json.dumps(data)except ValueError:data = {"error": f"Invalid status: {status}","valid_statuses": [s.value for s in TaskStatus]}return json.dumps(data)# Tools to demonstrate durability functionality@server.toolasync def start_data_migration(source_path: str,destination_path: str,ctx: Context,record_count: int = 1000) -> str:"""Starts a long-running data migration task.Args:source_path: Source path of the datadestination_path: Destination path of the datarecord_count: Number of records to migratectx: Contexto MCPReturns:Resource URI to track progress"""await ctx.info(f"Starting migration of {record_count} records from {source_path} to {destination_path}")print(f"Starting migration of {record_count} records from {source_path} to {destination_path}")async def migration_task(task_id: str, context: Context, manager: DurableTaskManager):"""Simulated data migration task."""batch_size = 100migrated = 0for batch_start in range(0, record_count, batch_size):# Simulate batch processingawait asyncio.sleep(1) # Simulate workbatch_end = min(batch_start + batch_size, record_count)migrated = batch_end# Update progressprogress_pct = (migrated / record_count) * 100message = f"Migrated {migrated}/{record_count} records"await manager.update_task_progress(task_id, progress_pct, 100.0, message)try:await context.report_progress(migrated, record_count, message)except Exception:passtry:await context.info(f"Data migration completed: {migrated} records")except Exception:passreturn {"migrated_records": migrated,"total_records": record_count,"source_path": source_path,"destination_path": destination_path,"success": True,"completion_time": time.time()}# Create the task and start background processingtask_id = await task_manager.create_task()await task_manager.start_background_task(task_id, migration_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Data migration started. Track progress at: {resource_link}")return resource_link@server.toolasync def start_batch_processing(batch_size: int,total_items: int,ctx: Context,processing_delay: float = 10) -> str:"""Starts a batch processing task.Args:batch_size: Size of each batchtotal_items: Total number of items to processprocessing_delay: Delay per batch in secondsctx: MCP contextReturns:Resource URI to track progress"""await ctx.info(f"Starting batch processing: {total_items} items in batches of {batch_size}")async def batch_task(task_id: str, context: Context, manager: DurableTaskManager):"""Batch processing task."""processed = 0for i in range(0, total_items, batch_size):await asyncio.sleep(processing_delay) # Simulate workbatch_end = min(i + batch_size, total_items)processed = batch_endprogress = (processed / total_items) * 100message = f"Processed {processed}/{total_items} items"await manager.update_task_progress(task_id, progress, 100.0, message)try:await context.report_progress(processed, total_items, message)except Exception:passtry:await context.info(f"Batch processing completed: {processed} items")except Exception:passreturn {"processed": processed,"total": total_items,"batch_size": batch_size,"processing_delay": processing_delay,"success": True}task_id = await task_manager.create_task()await task_manager.start_background_task(task_id, batch_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Batch processing started. Track progress at: {resource_link}")return resource_link@server.toolasync def start_ml_training(model_name: str,ctx: Context,dataset_size: int = 10000,epochs: int = 100) -> str:"""Simulates machine learning model training.Args:model_name: Name of the model to traindataset_size: Size of the datasetepochs: Number of training epochsctx: MCP contextReturns:Resource URI to track progress"""await ctx.info(f"Starting training of model '{model_name}' with {dataset_size} samples by {epochs} epochs")async def training_task(task_id: str, context: Context, manager: DurableTaskManager):"""Simulated ML training task."""for epoch in range(1, epochs + 1):# Simulate training of an epochawait asyncio.sleep(10)# Simulate training metricsloss = 1.0 - (epoch / epochs) * 0.8 + (epoch % 10) * 0.01accuracy = (epoch / epochs) * 0.95 + (epoch % 5) * 0.002progress = (epoch / epochs) * 100message = f"Epoch {epoch}/{epochs} - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}"await manager.update_task_progress(task_id, progress, 100.0, message)try:await context.report_progress(epoch, epochs, message)except Exception:passtry:await context.info(f"Model '{model_name}' training completed")except Exception:passreturn {"model_name": model_name,"dataset_size": dataset_size,"epochs_completed": epochs,"final_loss": loss,"final_accuracy": accuracy,"success": True,"training_time_seconds": epochs * 0.5}task_id = await task_manager.create_task()await task_manager.start_background_task(task_id, training_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Training started. Track progress at: {resource_link}")return resource_link@server.toolasync def cancel_task(task_id: str, ctx: Context) -> str:"""Cancels an in-progress task.Args:task_id: ID of the task to cancelctx: MCP contextReturns:Status message of the cancellation"""success = await task_manager.cancel_task(task_id)if success:await ctx.info(f"Task {task_id} cancelled successfully")return f"Task {task_id} cancelled. Status available at: task://status/{task_id}"else:await ctx.warning(f"Task {task_id} not found or cannot be cancelled")return f"Task {task_id} not found or cannot be cancelled"@server.toolasync def get_server_stats(ctx: Context) -> Dict[str, Any]:"""Gets durability server statistics.Args:ctx: MCP contextReturns:Server statistics"""all_tasks = await task_manager.list_tasks()stats = {"total_tasks": len(all_tasks),"running_tasks": len([t for t in all_tasks if t.status == TaskStatus.RUNNING]),"completed_tasks": len([t for t in all_tasks if t.status == TaskStatus.COMPLETED]),"failed_tasks": len([t for t in all_tasks if t.status == TaskStatus.FAILED]),"cancelled_tasks": len([t for t in all_tasks if t.status == TaskStatus.CANCELLED]),"pending_tasks": len([t for t in all_tasks if t.status == TaskStatus.PENDING]),"active_background_tasks": len(task_manager._background_tasks),}await ctx.info(f"Server statistics: {stats['total_tasks']} total tasks, {stats['running_tasks']} running")return statsif __name__ == "__main__":print("Durability MCP server started")print("Available tools:")print(" - start_data_migration: Starts data migration")print(" - start_batch_processing: Starts batch processing")print(" - start_ml_training: Simulates ML training")print(" - cancel_task: Cancels a task")print(" - get_server_stats: Gets server statistics")print(" Resources available:")print(" - task://status/{task_id}: Specific task status")print(" - task://list: Lists all tasks")print(" - task://list/{status}: Lists tasks by status")server.run(transport="http", host="127.0.0.1", port=8080)
Writing MCP_durability_server/server.py
Cliente MCP con durabilidad
El cliente va a ser muy parecido al que hicimos en los posts streamable MCP y resumable MCP
Implementación del cliente MCP con durabilidad
Va a haber una clase DurabilityClient
que va a tener métodos para conectarse al servidor, iniciar las tres posibles tareas (migración de datos, procesado de datos y entrenamiento de un modelo), obtener el estado de una tarea, obtener una lista de todas las tareas o cancelar una tarea
Crear entorno virtual del cliente
Primero creamos la carpeta donde lo vamos a desarrollar
!mkdir MCP_durability_client
Creamos el entorno con uv
!cd MCP_durability_client && uv init .
Initialized project `mcp-durability-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_client`
Lo iniciamos
!cd MCP_durability_client && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos las librerías necesarias
!cd MCP_durability_client && uv add fastmcp
Resolved 64 packages in 17msInstalled 61 packages in 80ms2+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.3+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.23.0+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.3.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.1+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.15.0+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código del cliente
%%writefile MCP_durability_client/client.py"""Durability MCP clientClient that demonstrates how to interact with an MCP server that implements durability.Shows how to:1. Start long-running tasks2. Monitor progress using resource polling3. Subscribe to status updates (simulated)4. Handle persistent tasks that survive server restartsUsage:python client.py"""import asyncioimport jsonimport timefrom typing import Any, Dict, List, Optionalfrom fastmcp import Clientfrom fastmcp.client.transports import StreamableHttpTransportimport argparseclass DurabilityClient:"""Client that demonstrates durability patterns with MCP."""def __init__(self, server_command: List[str]):"""Initializes the durability client.Args:server_command: Command to execute the MCP server"""self.server_command = server_commandself._polling_tasks: Dict[str, asyncio.Task] = {}async def connect(self) -> Client:"""Creates and connects the client to the server."""transport = StreamableHttpTransport(url="http://127.0.0.1:8080/mcp")client = Client(transport)return clientasync def start_data_migration(self,client: Client,source_path: str = "/data/source",destination_path: str = "/data/destination",record_count: int = 1000) -> str:"""Starts a data migration task and returns the resource link.Args:client: MCP connected clientsource_path: Source pathdestination_path: Destination pathrecord_count: Number of records to migrateReturns:Resource URI to track progress"""print(f"🚀 Starting data migration: {record_count} records")print(f" Source: {source_path}")print(f" Destination: {destination_path}")result = await client.call_tool("start_data_migration",{"source_path": source_path,"destination_path": destination_path,"record_count": record_count})resource_uri = result.content[0].textprint(f"✅ Task started. Resource URI: {resource_uri}")return resource_uriasync def start_batch_processing(self,client: Client,batch_size: int = 50,total_items: int = 500,processing_delay: float = 0.3) -> str:"""Starts batch processing.Args:client: MCP connected clientbatch_size: Batch sizetotal_items: Total itemsprocessing_delay: Processing delayReturns:Resource URI to track progress"""print(f"🔄 Starting batch processing:")print(f" Total items: {total_items}")print(f" Batch size: {batch_size}")print(f" Processing delay: {processing_delay}s")result = await client.call_tool("start_batch_processing",{"batch_size": batch_size,"total_items": total_items,"processing_delay": processing_delay})resource_uri = result.content[0].textprint(f"✅ Batch processing started. Resource URI: {resource_uri}")return resource_uriasync def start_ml_training(self,client: Client,model_name: str = "clasificador-texto",dataset_size: int = 5000,epochs: int = 50) -> str:"""Starts ML model training.Args:client: MCP connected clientmodel_name: Model namedataset_size: Dataset sizeepochs: Number of epochsReturns:Resource URI to track progress"""print(f"🤖 Starting ML training:")print(f" Model: {model_name}")print(f" Dataset size: {dataset_size} samples")print(f" Epochs: {epochs}")result = await client.call_tool("start_ml_training",{"model_name": model_name,"dataset_size": dataset_size,"epochs": epochs})resource_uri = result.content[0].textprint(f"✅ ML training started. Resource URI: {resource_uri}")return resource_uriasync def get_task_status(self, client: Client, resource_uri: str) -> Dict[str, Any]:"""Gets the current status of a task.Args:client: MCP connected clientresource_uri: Resource URI of the taskReturns:Current status of the task"""try:result = await client.read_resource(resource_uri)# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}, text={getattr(content_block, 'text', None)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading resource: {str(e)}"}async def poll_task_until_complete(self,client: Client,resource_uri: str,poll_interval: float = 1.0,max_duration: float = 10.0) -> Dict[str, Any]:"""Polls a task until it completes or fails.Args:client: MCP connected clientresource_uri: Resource URI of the taskpoll_interval: Interval between polls in secondsmax_duration: Maximum duration of pollingReturns:Final status of the task"""task_id = resource_uri.split("/")[-1]print(f"📊 Monitoring task {task_id}...")start_time = time.time()last_progress = -1while time.time() - start_time < max_duration:status = await self.get_task_status(client, resource_uri)if "error" in status and status["error"] is not None:print(f"Error getting status: {status['error']}")return statuscurrent_status = status.get("status", "unknown")progress = status.get("progress", 0)message = status.get("message", "")# Show updates only if the progress changedif progress != last_progress:progress_bar = self._create_progress_bar(progress)print(f" {progress_bar} {progress:5.1f}% | {current_status.upper()} | {message}")last_progress = progress# Check if the task is completeif current_status in ["completed", "failed", "cancelled"]:print(f"Task {current_status}: {task_id}")if current_status == "completed" and status.get("result_data"):print("Results:")self._print_results(status["result_data"])elif current_status == "failed" and status.get("error"):print(f"Error: {status['error']}")return statusawait asyncio.sleep(poll_interval)print(f"Timeout for task {task_id}")return await self.get_task_status(client, resource_uri)def _create_progress_bar(self, progress: float, width: int = 20) -> str:"""Creates a visual progress bar."""filled = int(width * progress / 100)bar = "█" * filled + "░" * (width - filled)return f"[{bar}]"def _print_results(self, result_data: Dict[str, Any], indent: int = 4) -> None:"""Prints the results in a formatted way."""spaces = " " * indentfor key, value in result_data.items():if isinstance(value, dict):print(f"{spaces}{key}:")self._print_results(value, indent + 2)elif isinstance(value, list):print(f"{spaces}{key}: {len(value)} items")else:print(f"{spaces}{key}: {value}")async def list_all_tasks(self, client: Client) -> Dict[str, Any]:"""Lists all tasks on the server."""try:result = await client.read_resource("task://list")# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading task list: {str(e)}"}async def list_tasks_by_status(self, client: Client, status: str) -> Dict[str, Any]:"""Lists tasks filtered by status."""try:result = await client.read_resource(f"task://list/{status}")# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading task list: {str(e)}"}async def print_task_list(self, client: Client) -> None:"""Prints a formatted list of all tasks."""print("Task list:")task_list = await self.list_all_tasks(client)if "error" in task_list:print(f" {task_list['error']}")returntasks = task_list.get("tasks", [])if not tasks:print(" No tasks")returnfor task in tasks:task_id = task["task_id"]status = task["status"].upper()progress = task.get("progress", 0)message = task.get("message", "")print(f" ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")async def select_task(self, client: Client) -> Optional[str]:"""Shows a numbered list of tasks and allows the user to select one.Args:client: MCP connected clientReturns:The ID of the selected task, or None if no task was selected"""print("📋 Available tasks:")task_list = await self.list_all_tasks(client)if "error" in task_list:print(f" ❌ {task_list['error']}")return Nonetasks = task_list.get("tasks", [])if not tasks:print(" No tasks available")return None# Show tasks with numbersfor i, task in enumerate(tasks, 1):task_id = task["task_id"]status = task["status"].upper()progress = task.get("progress", 0)message = task.get("message", "")print(f" {i}. ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")# Ask user to selecttry:choice = input(f" Select a task (1-{len(tasks)}) or press Enter to cancel: ").strip()if not choice:print("Selection cancelled")return Nonechoice_num = int(choice)if 1 <= choice_num <= len(tasks):selected_task = tasks[choice_num - 1]selected_id = selected_task["task_id"]print(f"✅ Selected task: {selected_id}")return selected_idelse:print(f"❌ Invalid selection. Please choose between 1 and {len(tasks)}")return Noneexcept ValueError:print("❌ Invalid input. Please enter a number")return Noneexcept Exception as e:print(f"❌ Error selecting task: {str(e)}")return Noneasync def cancel_task(self, client: Client, task_id: str) -> str:"""Cancels a specific task."""try:result = await client.call_tool("cancel_task", {"task_id": task_id})response = result.content[0].textprint(f"🚫 {response}")return responseexcept Exception as e:error_msg = f"Error canceling task: {str(e)}"print(f"❌ {error_msg}")return error_msgasync def get_server_stats(self, client: Client) -> Dict[str, Any]:"""Gets server statistics."""try:result = await client.call_tool("get_server_stats", {})# result.content is a list of ContentBlockif hasattr(result, 'content') and len(result.content) > 0:content = result.content[0]if hasattr(content, 'text'):return json.loads(content.text)else:# If not text, it may be the object directlyreturn content if isinstance(content, dict) else {"error": f"Unexpected format: {type(content)}"}else:return {"error": "Empty response from get_server_stats"}except Exception as e:return {"error": f"Error getting server statistics: {str(e)}"}async def print_server_stats(self, client: Client) -> None:"""Prints server statistics."""print("📊 Server statistics:")stats = await self.get_server_stats(client)if "error" in stats:print(f" ❌ {stats['error']}")returnprint(f" Total tasks: {stats.get('total_tasks', 0)}")print(f" Running: {stats.get('running_tasks', 0)}")print(f" Completed: {stats.get('completed_tasks', 0)}")print(f" Failed: {stats.get('failed_tasks', 0)}")print(f" Cancelled: {stats.get('cancelled_tasks', 0)}")print(f" Pending: {stats.get('pending_tasks', 0)}")print(f" Active background tasks: {stats.get('active_background_tasks', 0)}")async def interactive_demo(server_path: Optional[str] = None):"""Interactive demo to explore the system."""print("🚀 Interactive Demo of the MCP Durability System")print("=" * 55)client_manager = DurabilityClient([])async with await client_manager.connect() as client:print("🔗 Connected to the durability server")while True:print(" " + "=" * 40)print("Available options:")print("1. Start data migration")print("2. Start batch processing")print("3. Start ML training")print("4. View server statistics")print("5. View task list")print("6. Monitor specific task")print("7. Cancel task")print("8. Exit")try:choice = input(" Select an option (1-8): ").strip()if choice == "1":records = int(input("Number of records to migrate (default 500): ") or "500")uri = await client_manager.start_data_migration(client, record_count=records)print(f"Task URI: {uri}")elif choice == "2":total = int(input("Total elements (default 300): ") or "300")batch_size = int(input("Batch size (default 30): ") or "30")uri = await client_manager.start_batch_processing(client, total_items=total, batch_size=batch_size)print(f"Task URI: {uri}")elif choice == "3":model_name = input("Model name (default 'demo-model'): ") or "demo-model"epochs = int(input("Number of epochs (default 25): ") or "25")uri = await client_manager.start_ml_training(client, model_name=model_name, epochs=epochs)print(f"Task URI: {uri}")elif choice == "4":await client_manager.print_server_stats(client)elif choice == "5":await client_manager.print_task_list(client)elif choice == "6":task_id = await client_manager.select_task(client)if task_id:uri = f"task://status/{task_id}"await client_manager.poll_task_until_complete(client, uri)elif choice == "7":task_id = await client_manager.select_task(client)if task_id:await client_manager.cancel_task(client, task_id)elif choice == "8":print("👋 Bye!")breakelse:print("❌ Invalid option. Please select 1-8.")except KeyboardInterrupt:print(" 👋 Demo interrupted. Bye!")breakexcept Exception as e:print(f"❌ Error: {str(e)}")if __name__ == "__main__":try:asyncio.run(interactive_demo())except KeyboardInterrupt:print(" 👋 Bye!")except Exception as e:print(f"❌ Error: {str(e)}")
Writing MCP_durability_client/client.py
Ejecución
Primero levantamos el servidor
!cd MCP_durability_server && source .venv/bin/activate && uv run server.py
Durability MCP server startedAvailable tools:- start_data_migration: Starts data migration- start_batch_processing: Starts batch processing- start_ml_training: Simulates ML training- cancel_task: Cancels a task- get_server_stats: Gets server statisticsResources available:- task://status/{task_id}: Specific task status- task://list: Lists all tasks- task://list/{status}: Lists tasks by status╭─ FastMCP 2.0 ──────────────────────────────────────────────────────────────╮│ ││ _ __ ___ ______ __ __ _____________ ____ ____ ││ _ __ ___ / ____/___ ______/ /_/ |/ / ____/ __ |___ / __ ││ _ __ ___ / /_ / __ `/ ___/ __/ /|_/ / / / /_/ / ___/ / / / / / ││ _ __ ___ / __/ / /_/ (__ ) /_/ / / / /___/ ____/ / __/_/ /_/ / ││ _ __ ___ /_/ __,_/____/__/_/ /_/____/_/ /_____(_)____/ ││ ││ ││ ││ 🖥️ Server name: MCP Durability Server ││ 📦 Transport: Streamable-HTTP ││ 🔗 Server URL: http://127.0.0.1:8080/mcp ││ ││ 📚 Docs: https://gofastmcp.com ││ 🚀 Deploy: https://fastmcp.cloud ││ ││ 🏎️ FastMCP version: 2.11.3 ││ 🤝 MCP version: 1.13.1 ││ │╰────────────────────────────────────────────────────────────────────────────╯[08/28/25 11:46:08] INFO Starting MCP server 'MCP Durability ]8;id=396160;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py\server.py]8;;\:]8;id=646262;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py#1522\1522]8;;\Server' with transport 'http' onhttp://127.0.0.1:8080/mcpINFO: Started server process [55234]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)
Ahora ejecutamos el cliente
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
Primero seleccionamos la opción 5
para ver que no hay ninguna tarea corriendo
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:No tasks========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
Vemos que sale
Task list:
No tasks
No hay ninguna tarea corriendo
Ahora seleccionamos la opción 1
para empezar la tarea de migración de datos y seleccionamos 100.000
muestras para asegurar que la tarea dura lo suficiente para poder hacer todo lo que necesitamos en el post
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 1Number of records to migrate (default 500): 100000🚀 Starting data migration: 100000 recordsSource: /data/sourceDestination: /data/destination[08/28/25 12:05:22] INFO Server log: Starting migration of logging.py:40100000 records from /data/source to/data/destinationINFO Server log: Data migration started. logging.py:40Track progress at:task://status/821a210a-8672-4eba-b27c-500bf63e58c1✅ Task started. Resource URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1Task URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Seleccionamos la opción 5
para ver la lista de tareas
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 1.0% | Migrated 1000/100000 records
Ahora aparece la tarea que acabamos de pedir al servidor MCP, podemos ver su ID y su estado, lleva 1000 datos de 100.000
Seleccionamos la opción 8
para terminar el cliente
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 8👋 Bye!
Volvemos a ejecutar el cliente
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
Volvemos a seleccionar la opción 5
para ver la lista de tareas ejecutándose en el servidor
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 3.4% | Migrated 3400/100000 records
Vemos que la tarea se ha seguido ejecutando y que lleva más datos procecesados. Antes llevaba 1000 y ahora lleva 3400
Ahora seleccionamos la opción 6
para monitorizar la tarea que hemos pedido al servidor. Esto nos va a permitir monitorizarla durante 10 segundos
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 6📋 Available tasks:1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 4.2% | Migrated 4200/100000 recordsSelect a task (1-1) or press Enter to cancel: 1✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1📊 Monitoring task 821a210a-8672-4eba-b27c-500bf63e58c1...[░░░░░░░░░░░░░░░░░░░░] 4.9% | RUNNING | Migrated 4900/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.0% | RUNNING | Migrated 5000/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.1% | RUNNING | Migrated 5100/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.2% | RUNNING | Migrated 5200/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.3% | RUNNING | Migrated 5300/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.4% | RUNNING | Migrated 5400/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.5% | RUNNING | Migrated 5500/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.6% | RUNNING | Migrated 5600/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.7% | RUNNING | Migrated 5700/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.8% | RUNNING | Migrated 5800/100000 recordsTimeout for task 821a210a-8672-4eba-b27c-500bf63e58c1
Por último, cancelamos la tarea mediante la opción 7
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 7📋 Available tasks:1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 6.1% | Migrated 6100/100000 recordsSelect a task (1-1) or press Enter to cancel: 1✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1[08/28/25 12:06:25] INFO Server log: Task logging.py:40821a210a-8672-4eba-b27c-500bf63e58c1cancelled successfully🚫 Task 821a210a-8672-4eba-b27c-500bf63e58c1 cancelled. Status available at: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Ahora volvemos a elegir la opción 5
para ver la lista de tareas
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | CANCELLED | 6.3% | Migrated 6300/100000 records
Vemos que la tarea que le pedimos al servidor aparece como cancelada (CANCELLED
)