Nesta publicação, veremos como criar um servidor e um cliente MCP com a capacidade de o cliente solicitar uma tarefa de longa duração ao servidor, encerrar o cliente, a tarefa continuar sendo executada no servidor, reconectar o cliente e ver o status da tarefa.
Servidor MCP com durabilidade
O servidor será semelhante ao que criamos nas publicações streamable MCP e resumable MCP, com a diferença de que agora vamos adicionar a capacidade de o cliente solicitar uma tarefa de longa duração ao servidor, parar o cliente, a tarefa continuar sendo executada no servidor, reconectar o cliente e ver o status da tarefa.
Além disso, neste caso, vamos guardar as tarefas em um banco de dados SQLite para torná-lo mais profissional, já que nos posts anteriores guardávamos as informações em arquivos json.
Implementación del servidor MCP con durabilidad
O servidor terá uma classe DurableTaskManager
que irá gerenciar a criação das tarefas, armazená-las no banco de dados, relatar o status e poderá cancelá-las.
Você terá três recursos: para obter o status de uma tarefa, para listar todas as tarefas e para listar as tarefas por status.
E terá cinco ferramentas, três serão tarefas específicas (migração de dados, processamento de dados e treinamento de modelo), uma para cancelar uma tarefa e a última para obter as estatísticas do servidor.
Criar ambiente virtual do servidor
Primeiro criamos a pasta onde vamos desenvolvê-lo.
!mkdir MCP_durability_server
Criamos o ambiente com uv
!cd MCP_durability_server && uv init .
Initialized project `mcp-durability-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_server`
Iniciamos o ambiente
!cd MCP_durability_server && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos as bibliotecas necessárias
!cd MCP_durability_server && uv add fastmcp
Resolved 64 packages in 344msInstalled 61 packages in 136ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.3+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.23.0+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.3.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.1+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.15.0+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código do servidor
%%writefile MCP_durability_server/server.py"""MCP Durability ServerMPC server that implements durability for long-running agents using Resource links.Allows long-running operations to survive server restarts and provides state trackingoutside of band.Implemented pattern:1. Tools return resource links immediately2. Background processing continues asynchronously3. Clients can poll or subscribe to resources for state updatesUsage:python server.py"""import asyncioimport jsonimport sqlite3import timeimport uuidfrom dataclasses import dataclassfrom enum import Enumfrom typing import Any, Dict, List, Optionalfrom fastmcp import FastMCPfrom fastmcp.server.context import Contextclass TaskStatus(Enum):"""Long running task status."""PENDING = "pending"RUNNING = "running"COMPLETED = "completed"FAILED = "failed"CANCELLED = "cancelled"@dataclassclass TaskResult:"""Result data for a completed task."""task_id: strstatus: TaskStatusprogress: float = 0.0total: Optional[float] = Nonemessage: Optional[str] = Noneresult_data: Optional[Dict[str, Any]] = Noneerror: Optional[str] = Nonecreated_at: float = 0.0updated_at: float = 0.0completed_at: Optional[float] = Noneclass DurableTaskManager:"""Manages persistent task state and background execution."""def __init__(self, db_path: str = "mcp_tasks.db"):self.db_path = db_pathself._background_tasks: Dict[str, asyncio.Task] = {}self._setup_database()def _setup_database(self) -> None:"""Initializes SQLite database for task persistence."""conn = sqlite3.connect(self.db_path)conn.execute("""CREATE TABLE IF NOT EXISTS tasks (task_id TEXT PRIMARY KEY,status TEXT NOT NULL,progress REAL DEFAULT 0.0,total REAL,message TEXT,result_data TEXT,error TEXT,created_at REAL NOT NULL,updated_at REAL NOT NULL,completed_at REAL)""")conn.commit()conn.close()async def create_task(self, task_id: Optional[str] = None) -> str:"""Creates a new task and returns its ID."""if not task_id:task_id = str(uuid.uuid4())current_time = time.time()task_result = TaskResult(task_id=task_id,status=TaskStatus.PENDING,created_at=current_time,updated_at=current_time)await self._save_task(task_result)return task_idasync def start_background_task(self,task_id: str,task_function,context: Optional[Context] = None) -> None:"""Starts a background task and tracks its execution."""async def wrapper():try:await self._update_task_status(task_id, TaskStatus.RUNNING)# Execute the real taskif context:result = await task_function(task_id, context, self)else:result = await task_function(task_id, self)# Mark as completed with resultsawait self._update_task_completion(task_id, result)except asyncio.CancelledError:await self._update_task_status(task_id, TaskStatus.CANCELLED)raiseexcept Exception as e:await self._update_task_error(task_id, str(e))finally:# Clean up background task referenceself._background_tasks.pop(task_id, None)# Start the task in the backgroundtask = asyncio.create_task(wrapper())self._background_tasks[task_id] = taskasync def _save_task(self, task_result: TaskResult) -> None:"""Saves the task result to the database."""conn = sqlite3.connect(self.db_path)conn.execute("""INSERT OR REPLACE INTO tasks(task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_at)VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (task_result.task_id,task_result.status.value,task_result.progress,task_result.total,task_result.message,json.dumps(task_result.result_data) if task_result.result_data else None,task_result.error,task_result.created_at,task_result.updated_at,task_result.completed_at))conn.commit()conn.close()async def get_task(self, task_id: str) -> Optional[TaskResult]:"""Retrieves the task result from the database."""conn = sqlite3.connect(self.db_path)cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks WHERE task_id = ?""", (task_id,))row = cursor.fetchone()conn.close()if not row:return Noneresult_data = Noneif row[5]: # result_datatry:result_data = json.loads(row[5])except json.JSONDecodeError:passreturn TaskResult(task_id=row[0],status=TaskStatus(row[1]),progress=row[2],total=row[3],message=row[4],result_data=result_data,error=row[6],created_at=row[7],updated_at=row[8],completed_at=row[9])async def update_task_progress(self,task_id: str,progress: float,total: Optional[float] = None,message: Optional[str] = None) -> None:"""Updates the task progress."""task_result = await self.get_task(task_id)if not task_result:returntask_result.progress = progressif total is not None:task_result.total = totalif message is not None:task_result.message = messagetask_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_status(self, task_id: str, status: TaskStatus) -> None:"""Updates the task status."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = statustask_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_completion(self, task_id: str, result_data: Any) -> None:"""Marks the task as completed with results."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = TaskStatus.COMPLETEDtask_result.progress = task_result.total or 100.0task_result.result_data = result_data if isinstance(result_data, dict) else {"result": result_data}task_result.completed_at = time.time()task_result.updated_at = time.time()await self._save_task(task_result)async def _update_task_error(self, task_id: str, error: str) -> None:"""Marks the task as failed with error."""task_result = await self.get_task(task_id)if not task_result:returntask_result.status = TaskStatus.FAILEDtask_result.error = errortask_result.updated_at = time.time()await self._save_task(task_result)async def cancel_task(self, task_id: str) -> bool:"""Cancels a background task in execution."""if task_id in self._background_tasks:task = self._background_tasks[task_id]task.cancel()await self._update_task_status(task_id, TaskStatus.CANCELLED)return Truereturn Falseasync def list_tasks(self, status_filter: Optional[TaskStatus] = None) -> List[TaskResult]:"""Lists all tasks, optionally filtered by status."""conn = sqlite3.connect(self.db_path)if status_filter:cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks WHERE status = ? ORDER BY updated_at DESC""", (status_filter.value,))else:cursor = conn.execute("""SELECT task_id, status, progress, total, message, result_data, error,created_at, updated_at, completed_atFROM tasks ORDER BY updated_at DESC""")rows = cursor.fetchall()conn.close()tasks = []for row in rows:result_data = Noneif row[5]:try:result_data = json.loads(row[5])except json.JSONDecodeError:passtasks.append(TaskResult(task_id=row[0],status=TaskStatus(row[1]),progress=row[2],total=row[3],message=row[4],result_data=result_data,error=row[6],created_at=row[7],updated_at=row[8],completed_at=row[9]))return tasks# Create the MCP server with durability capabilitiesserver = FastMCP(name="MCP Durability Server",instructions="An MCP server that demonstrates durability for long-running agents")# Initialize the task managertask_manager = DurableTaskManager()# Resources to track the state of tasks@server.resource("task://status/{task_id}")async def get_task_status(task_id: str) -> str:"""Gets the current status of a task by ID."""task_result = await task_manager.get_task(task_id)if not task_result:return json.dumps({"error": "Task not found", "task_id": task_id})data = {"task_id": task_result.task_id,"status": task_result.status.value,"progress": task_result.progress,"total": task_result.total,"message": task_result.message,"result_data": task_result.result_data,"error": task_result.error,"created_at": task_result.created_at,"updated_at": task_result.updated_at,"completed_at": task_result.completed_at,}return json.dumps(data)@server.resource("task://list")async def list_all_tasks() -> str:"""Lists all tasks with their current status."""tasks = await task_manager.list_tasks()data = {"tasks": [{"task_id": task.task_id,"status": task.status.value,"progress": task.progress,"total": task.total,"message": task.message,"created_at": task.created_at,"updated_at": task.updated_at,"completed_at": task.completed_at,"has_result": task.result_data is not None,"has_error": task.error is not None} for task in tasks],"total": len(tasks)}return json.dumps(data)@server.resource("task://list/{status}")async def list_tasks_by_status(status: str) -> str:"""Lists tasks filtered by status."""try:status_enum = TaskStatus(status.lower())tasks = await task_manager.list_tasks(status_enum)data = {"tasks": [{"task_id": task.task_id,"status": task.status.value,"progress": task.progress,"total": task.total,"message": task.message,"created_at": task.created_at,"updated_at": task.updated_at,"completed_at": task.completed_at} for task in tasks],"status_filter": status,"total": len(tasks)}return json.dumps(data)except ValueError:data = {"error": f"Invalid status: {status}","valid_statuses": [s.value for s in TaskStatus]}return json.dumps(data)# Tools to demonstrate durability functionality@server.toolasync def start_data_migration(source_path: str,destination_path: str,ctx: Context,record_count: int = 1000) -> str:"""Starts a long-running data migration task.Args:source_path: Source path of the datadestination_path: Destination path of the datarecord_count: Number of records to migratectx: Contexto MCPReturns:Resource URI to track progress"""await ctx.info(f"Starting migration of {record_count} records from {source_path} to {destination_path}")print(f"Starting migration of {record_count} records from {source_path} to {destination_path}")async def migration_task(task_id: str, context: Context, manager: DurableTaskManager):"""Simulated data migration task."""batch_size = 100migrated = 0for batch_start in range(0, record_count, batch_size):# Simulate batch processingawait asyncio.sleep(1) # Simulate workbatch_end = min(batch_start + batch_size, record_count)migrated = batch_end# Update progressprogress_pct = (migrated / record_count) * 100message = f"Migrated {migrated}/{record_count} records"await manager.update_task_progress(task_id, progress_pct, 100.0, message)try:await context.report_progress(migrated, record_count, message)except Exception:passtry:await context.info(f"Data migration completed: {migrated} records")except Exception:passreturn {"migrated_records": migrated,"total_records": record_count,"source_path": source_path,"destination_path": destination_path,"success": True,"completion_time": time.time()}# Create the task and start background processingtask_id = await task_manager.create_task()await task_manager.start_background_task(task_id, migration_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Data migration started. Track progress at: {resource_link}")return resource_link@server.toolasync def start_batch_processing(batch_size: int,total_items: int,ctx: Context,processing_delay: float = 10) -> str:"""Starts a batch processing task.Args:batch_size: Size of each batchtotal_items: Total number of items to processprocessing_delay: Delay per batch in secondsctx: MCP contextReturns:Resource URI to track progress"""await ctx.info(f"Starting batch processing: {total_items} items in batches of {batch_size}")async def batch_task(task_id: str, context: Context, manager: DurableTaskManager):"""Batch processing task."""processed = 0for i in range(0, total_items, batch_size):await asyncio.sleep(processing_delay) # Simulate workbatch_end = min(i + batch_size, total_items)processed = batch_endprogress = (processed / total_items) * 100message = f"Processed {processed}/{total_items} items"await manager.update_task_progress(task_id, progress, 100.0, message)try:await context.report_progress(processed, total_items, message)except Exception:passtry:await context.info(f"Batch processing completed: {processed} items")except Exception:passreturn {"processed": processed,"total": total_items,"batch_size": batch_size,"processing_delay": processing_delay,"success": True}task_id = await task_manager.create_task()await task_manager.start_background_task(task_id, batch_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Batch processing started. Track progress at: {resource_link}")return resource_link@server.toolasync def start_ml_training(model_name: str,ctx: Context,dataset_size: int = 10000,epochs: int = 100) -> str:"""Simulates machine learning model training.Args:model_name: Name of the model to traindataset_size: Size of the datasetepochs: Number of training epochsctx: MCP contextReturns:Resource URI to track progress"""await ctx.info(f"Starting training of model '{model_name}' with {dataset_size} samples by {epochs} epochs")async def training_task(task_id: str, context: Context, manager: DurableTaskManager):"""Simulated ML training task."""for epoch in range(1, epochs + 1):# Simulate training of an epochawait asyncio.sleep(10)# Simulate training metricsloss = 1.0 - (epoch / epochs) * 0.8 + (epoch % 10) * 0.01accuracy = (epoch / epochs) * 0.95 + (epoch % 5) * 0.002progress = (epoch / epochs) * 100message = f"Epoch {epoch}/{epochs} - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}"await manager.update_task_progress(task_id, progress, 100.0, message)try:await context.report_progress(epoch, epochs, message)except Exception:passtry:await context.info(f"Model '{model_name}' training completed")except Exception:passreturn {"model_name": model_name,"dataset_size": dataset_size,"epochs_completed": epochs,"final_loss": loss,"final_accuracy": accuracy,"success": True,"training_time_seconds": epochs * 0.5}task_id = await task_manager.create_task()await task_manager.start_background_task(task_id, training_task, ctx)resource_link = f"task://status/{task_id}"await ctx.info(f"Training started. Track progress at: {resource_link}")return resource_link@server.toolasync def cancel_task(task_id: str, ctx: Context) -> str:"""Cancels an in-progress task.Args:task_id: ID of the task to cancelctx: MCP contextReturns:Status message of the cancellation"""success = await task_manager.cancel_task(task_id)if success:await ctx.info(f"Task {task_id} cancelled successfully")return f"Task {task_id} cancelled. Status available at: task://status/{task_id}"else:await ctx.warning(f"Task {task_id} not found or cannot be cancelled")return f"Task {task_id} not found or cannot be cancelled"@server.toolasync def get_server_stats(ctx: Context) -> Dict[str, Any]:"""Gets durability server statistics.Args:ctx: MCP contextReturns:Server statistics"""all_tasks = await task_manager.list_tasks()stats = {"total_tasks": len(all_tasks),"running_tasks": len([t for t in all_tasks if t.status == TaskStatus.RUNNING]),"completed_tasks": len([t for t in all_tasks if t.status == TaskStatus.COMPLETED]),"failed_tasks": len([t for t in all_tasks if t.status == TaskStatus.FAILED]),"cancelled_tasks": len([t for t in all_tasks if t.status == TaskStatus.CANCELLED]),"pending_tasks": len([t for t in all_tasks if t.status == TaskStatus.PENDING]),"active_background_tasks": len(task_manager._background_tasks),}await ctx.info(f"Server statistics: {stats['total_tasks']} total tasks, {stats['running_tasks']} running")return statsif __name__ == "__main__":print("Durability MCP server started")print("Available tools:")print(" - start_data_migration: Starts data migration")print(" - start_batch_processing: Starts batch processing")print(" - start_ml_training: Simulates ML training")print(" - cancel_task: Cancels a task")print(" - get_server_stats: Gets server statistics")print(" Resources available:")print(" - task://status/{task_id}: Specific task status")print(" - task://list: Lists all tasks")print(" - task://list/{status}: Lists tasks by status")server.run(transport="http", host="127.0.0.1", port=8080)
Writing MCP_durability_server/server.py
Cliente MCP com durabilidade
O cliente será muito semelhante ao que criamos nas publicações streamable MCP e resumable MCP.
Implementação do cliente MCP com durabilidade
Haverá uma classe DurabilityClient
que terá métodos para se conectar ao servidor, iniciar as três tarefas possíveis (migração de dados, processamento de dados e treinamento de um modelo), obter o status de uma tarefa, obter uma lista de todas as tarefas ou cancelar uma tarefa.
Criar um ambiente virtual do cliente
Primeiro criamos a pasta onde vamos desenvolvê-lo.
!mkdir MCP_durability_client
Criamos o ambiente com uv
!cd MCP_durability_client && uv init .
Initialized project `mcp-durability-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_client`
Iniciamos o ambiente
!cd MCP_durability_client && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos as bibliotecas necessárias
!cd MCP_durability_client && uv add fastmcp
Resolved 64 packages in 17msInstalled 61 packages in 80ms2+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.3+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.23.0+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.3.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.1+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.15.0+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código do cliente
%%writefile MCP_durability_client/client.py"""Durability MCP clientClient that demonstrates how to interact with an MCP server that implements durability.Shows how to:1. Start long-running tasks2. Monitor progress using resource polling3. Subscribe to status updates (simulated)4. Handle persistent tasks that survive server restartsUsage:python client.py"""import asyncioimport jsonimport timefrom typing import Any, Dict, List, Optionalfrom fastmcp import Clientfrom fastmcp.client.transports import StreamableHttpTransportimport argparseclass DurabilityClient:"""Client that demonstrates durability patterns with MCP."""def __init__(self, server_command: List[str]):"""Initializes the durability client.Args:server_command: Command to execute the MCP server"""self.server_command = server_commandself._polling_tasks: Dict[str, asyncio.Task] = {}async def connect(self) -> Client:"""Creates and connects the client to the server."""transport = StreamableHttpTransport(url="http://127.0.0.1:8080/mcp")client = Client(transport)return clientasync def start_data_migration(self,client: Client,source_path: str = "/data/source",destination_path: str = "/data/destination",record_count: int = 1000) -> str:"""Starts a data migration task and returns the resource link.Args:client: MCP connected clientsource_path: Source pathdestination_path: Destination pathrecord_count: Number of records to migrateReturns:Resource URI to track progress"""print(f"🚀 Starting data migration: {record_count} records")print(f" Source: {source_path}")print(f" Destination: {destination_path}")result = await client.call_tool("start_data_migration",{"source_path": source_path,"destination_path": destination_path,"record_count": record_count})resource_uri = result.content[0].textprint(f"✅ Task started. Resource URI: {resource_uri}")return resource_uriasync def start_batch_processing(self,client: Client,batch_size: int = 50,total_items: int = 500,processing_delay: float = 0.3) -> str:"""Starts batch processing.Args:client: MCP connected clientbatch_size: Batch sizetotal_items: Total itemsprocessing_delay: Processing delayReturns:Resource URI to track progress"""print(f"🔄 Starting batch processing:")print(f" Total items: {total_items}")print(f" Batch size: {batch_size}")print(f" Processing delay: {processing_delay}s")result = await client.call_tool("start_batch_processing",{"batch_size": batch_size,"total_items": total_items,"processing_delay": processing_delay})resource_uri = result.content[0].textprint(f"✅ Batch processing started. Resource URI: {resource_uri}")return resource_uriasync def start_ml_training(self,client: Client,model_name: str = "clasificador-texto",dataset_size: int = 5000,epochs: int = 50) -> str:"""Starts ML model training.Args:client: MCP connected clientmodel_name: Model namedataset_size: Dataset sizeepochs: Number of epochsReturns:Resource URI to track progress"""print(f"🤖 Starting ML training:")print(f" Model: {model_name}")print(f" Dataset size: {dataset_size} samples")print(f" Epochs: {epochs}")result = await client.call_tool("start_ml_training",{"model_name": model_name,"dataset_size": dataset_size,"epochs": epochs})resource_uri = result.content[0].textprint(f"✅ ML training started. Resource URI: {resource_uri}")return resource_uriasync def get_task_status(self, client: Client, resource_uri: str) -> Dict[str, Any]:"""Gets the current status of a task.Args:client: MCP connected clientresource_uri: Resource URI of the taskReturns:Current status of the task"""try:result = await client.read_resource(resource_uri)# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}, text={getattr(content_block, 'text', None)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading resource: {str(e)}"}async def poll_task_until_complete(self,client: Client,resource_uri: str,poll_interval: float = 1.0,max_duration: float = 10.0) -> Dict[str, Any]:"""Polls a task until it completes or fails.Args:client: MCP connected clientresource_uri: Resource URI of the taskpoll_interval: Interval between polls in secondsmax_duration: Maximum duration of pollingReturns:Final status of the task"""task_id = resource_uri.split("/")[-1]print(f"📊 Monitoring task {task_id}...")start_time = time.time()last_progress = -1while time.time() - start_time < max_duration:status = await self.get_task_status(client, resource_uri)if "error" in status and status["error"] is not None:print(f"Error getting status: {status['error']}")return statuscurrent_status = status.get("status", "unknown")progress = status.get("progress", 0)message = status.get("message", "")# Show updates only if the progress changedif progress != last_progress:progress_bar = self._create_progress_bar(progress)print(f" {progress_bar} {progress:5.1f}% | {current_status.upper()} | {message}")last_progress = progress# Check if the task is completeif current_status in ["completed", "failed", "cancelled"]:print(f"Task {current_status}: {task_id}")if current_status == "completed" and status.get("result_data"):print("Results:")self._print_results(status["result_data"])elif current_status == "failed" and status.get("error"):print(f"Error: {status['error']}")return statusawait asyncio.sleep(poll_interval)print(f"Timeout for task {task_id}")return await self.get_task_status(client, resource_uri)def _create_progress_bar(self, progress: float, width: int = 20) -> str:"""Creates a visual progress bar."""filled = int(width * progress / 100)bar = "█" * filled + "░" * (width - filled)return f"[{bar}]"def _print_results(self, result_data: Dict[str, Any], indent: int = 4) -> None:"""Prints the results in a formatted way."""spaces = " " * indentfor key, value in result_data.items():if isinstance(value, dict):print(f"{spaces}{key}:")self._print_results(value, indent + 2)elif isinstance(value, list):print(f"{spaces}{key}: {len(value)} items")else:print(f"{spaces}{key}: {value}")async def list_all_tasks(self, client: Client) -> Dict[str, Any]:"""Lists all tasks on the server."""try:result = await client.read_resource("task://list")# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading task list: {str(e)}"}async def list_tasks_by_status(self, client: Client, status: str) -> Dict[str, Any]:"""Lists tasks filtered by status."""try:result = await client.read_resource(f"task://list/{status}")# result should be a list of ReadResourceContentsif isinstance(result, list) and len(result) > 0:content_block = result[0]# The content is in the 'text' attributeif hasattr(content_block, 'text') and content_block.text:return json.loads(content_block.text)else:return {"error": f"No text found in the resource. Type: {type(content_block)}"}else:return {"error": "Empty resource"}except Exception as e:return {"error": f"Error reading task list: {str(e)}"}async def print_task_list(self, client: Client) -> None:"""Prints a formatted list of all tasks."""print("Task list:")task_list = await self.list_all_tasks(client)if "error" in task_list:print(f" {task_list['error']}")returntasks = task_list.get("tasks", [])if not tasks:print(" No tasks")returnfor task in tasks:task_id = task["task_id"]status = task["status"].upper()progress = task.get("progress", 0)message = task.get("message", "")print(f" ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")async def select_task(self, client: Client) -> Optional[str]:"""Shows a numbered list of tasks and allows the user to select one.Args:client: MCP connected clientReturns:The ID of the selected task, or None if no task was selected"""print("📋 Available tasks:")task_list = await self.list_all_tasks(client)if "error" in task_list:print(f" ❌ {task_list['error']}")return Nonetasks = task_list.get("tasks", [])if not tasks:print(" No tasks available")return None# Show tasks with numbersfor i, task in enumerate(tasks, 1):task_id = task["task_id"]status = task["status"].upper()progress = task.get("progress", 0)message = task.get("message", "")print(f" {i}. ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")# Ask user to selecttry:choice = input(f" Select a task (1-{len(tasks)}) or press Enter to cancel: ").strip()if not choice:print("Selection cancelled")return Nonechoice_num = int(choice)if 1 <= choice_num <= len(tasks):selected_task = tasks[choice_num - 1]selected_id = selected_task["task_id"]print(f"✅ Selected task: {selected_id}")return selected_idelse:print(f"❌ Invalid selection. Please choose between 1 and {len(tasks)}")return Noneexcept ValueError:print("❌ Invalid input. Please enter a number")return Noneexcept Exception as e:print(f"❌ Error selecting task: {str(e)}")return Noneasync def cancel_task(self, client: Client, task_id: str) -> str:"""Cancels a specific task."""try:result = await client.call_tool("cancel_task", {"task_id": task_id})response = result.content[0].textprint(f"🚫 {response}")return responseexcept Exception as e:error_msg = f"Error canceling task: {str(e)}"print(f"❌ {error_msg}")return error_msgasync def get_server_stats(self, client: Client) -> Dict[str, Any]:"""Gets server statistics."""try:result = await client.call_tool("get_server_stats", {})# result.content is a list of ContentBlockif hasattr(result, 'content') and len(result.content) > 0:content = result.content[0]if hasattr(content, 'text'):return json.loads(content.text)else:# If not text, it may be the object directlyreturn content if isinstance(content, dict) else {"error": f"Unexpected format: {type(content)}"}else:return {"error": "Empty response from get_server_stats"}except Exception as e:return {"error": f"Error getting server statistics: {str(e)}"}async def print_server_stats(self, client: Client) -> None:"""Prints server statistics."""print("📊 Server statistics:")stats = await self.get_server_stats(client)if "error" in stats:print(f" ❌ {stats['error']}")returnprint(f" Total tasks: {stats.get('total_tasks', 0)}")print(f" Running: {stats.get('running_tasks', 0)}")print(f" Completed: {stats.get('completed_tasks', 0)}")print(f" Failed: {stats.get('failed_tasks', 0)}")print(f" Cancelled: {stats.get('cancelled_tasks', 0)}")print(f" Pending: {stats.get('pending_tasks', 0)}")print(f" Active background tasks: {stats.get('active_background_tasks', 0)}")async def interactive_demo(server_path: Optional[str] = None):"""Interactive demo to explore the system."""print("🚀 Interactive Demo of the MCP Durability System")print("=" * 55)client_manager = DurabilityClient([])async with await client_manager.connect() as client:print("🔗 Connected to the durability server")while True:print(" " + "=" * 40)print("Available options:")print("1. Start data migration")print("2. Start batch processing")print("3. Start ML training")print("4. View server statistics")print("5. View task list")print("6. Monitor specific task")print("7. Cancel task")print("8. Exit")try:choice = input(" Select an option (1-8): ").strip()if choice == "1":records = int(input("Number of records to migrate (default 500): ") or "500")uri = await client_manager.start_data_migration(client, record_count=records)print(f"Task URI: {uri}")elif choice == "2":total = int(input("Total elements (default 300): ") or "300")batch_size = int(input("Batch size (default 30): ") or "30")uri = await client_manager.start_batch_processing(client, total_items=total, batch_size=batch_size)print(f"Task URI: {uri}")elif choice == "3":model_name = input("Model name (default 'demo-model'): ") or "demo-model"epochs = int(input("Number of epochs (default 25): ") or "25")uri = await client_manager.start_ml_training(client, model_name=model_name, epochs=epochs)print(f"Task URI: {uri}")elif choice == "4":await client_manager.print_server_stats(client)elif choice == "5":await client_manager.print_task_list(client)elif choice == "6":task_id = await client_manager.select_task(client)if task_id:uri = f"task://status/{task_id}"await client_manager.poll_task_until_complete(client, uri)elif choice == "7":task_id = await client_manager.select_task(client)if task_id:await client_manager.cancel_task(client, task_id)elif choice == "8":print("👋 Bye!")breakelse:print("❌ Invalid option. Please select 1-8.")except KeyboardInterrupt:print(" 👋 Demo interrupted. Bye!")breakexcept Exception as e:print(f"❌ Error: {str(e)}")if __name__ == "__main__":try:asyncio.run(interactive_demo())except KeyboardInterrupt:print(" 👋 Bye!")except Exception as e:print(f"❌ Error: {str(e)}")
Writing MCP_durability_client/client.py
Execução
Primeiro, iniciamos o servidor
!cd MCP_durability_server && source .venv/bin/activate && uv run server.py
Durability MCP server startedAvailable tools:- start_data_migration: Starts data migration- start_batch_processing: Starts batch processing- start_ml_training: Simulates ML training- cancel_task: Cancels a task- get_server_stats: Gets server statisticsResources available:- task://status/{task_id}: Specific task status- task://list: Lists all tasks- task://list/{status}: Lists tasks by status╭─ FastMCP 2.0 ──────────────────────────────────────────────────────────────╮│ ││ _ __ ___ ______ __ __ _____________ ____ ____ ││ _ __ ___ / ____/___ ______/ /_/ |/ / ____/ __ |___ / __ ││ _ __ ___ / /_ / __ `/ ___/ __/ /|_/ / / / /_/ / ___/ / / / / / ││ _ __ ___ / __/ / /_/ (__ ) /_/ / / / /___/ ____/ / __/_/ /_/ / ││ _ __ ___ /_/ __,_/____/__/_/ /_/____/_/ /_____(_)____/ ││ ││ ││ ││ 🖥️ Server name: MCP Durability Server ││ 📦 Transport: Streamable-HTTP ││ 🔗 Server URL: http://127.0.0.1:8080/mcp ││ ││ 📚 Docs: https://gofastmcp.com ││ 🚀 Deploy: https://fastmcp.cloud ││ ││ 🏎️ FastMCP version: 2.11.3 ││ 🤝 MCP version: 1.13.1 ││ │╰────────────────────────────────────────────────────────────────────────────╯[08/28/25 11:46:08] INFO Starting MCP server 'MCP Durability ]8;id=396160;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py\server.py]8;;\:]8;id=646262;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py#1522\1522]8;;\Server' with transport 'http' onhttp://127.0.0.1:8080/mcpINFO: Started server process [55234]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)
Agora executamos o cliente
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
Primeiro, selecionamos a opção 5
para verificar se não há nenhuma tarefa em execução.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:No tasks========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
Vemos que sai
Task list:
No tasks
Não há nenhuma tarefa em execução.
Agora selecionamos a opção 1
para iniciar a tarefa de migração de dados e selecionamos 100.000
amostras para garantir que a tarefa dure o suficiente para que possamos fazer tudo o que precisamos na postagem.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 1Number of records to migrate (default 500): 100000🚀 Starting data migration: 100000 recordsSource: /data/sourceDestination: /data/destination[08/28/25 12:05:22] INFO Server log: Starting migration of logging.py:40100000 records from /data/source to/data/destinationINFO Server log: Data migration started. logging.py:40Track progress at:task://status/821a210a-8672-4eba-b27c-500bf63e58c1✅ Task started. Resource URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1Task URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Selecionamos a opção 5
para ver a lista de tarefas
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 1.0% | Migrated 1000/100000 records
Agora aparece a tarefa que acabamos de solicitar ao servidor MCP, podemos ver seu ID e seu status, ela contém 1000 dados de 100.000.
Selecionamos a opção 8
para encerrar o cliente.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 8👋 Bye!
Executamos novamente o cliente
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8):
Selecionamos novamente a opção 5
para ver a lista de tarefas em execução no servidor.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
🚀 Interactive Demo of the MCP Durability System=======================================================🔗 Connected to the durability server========================================Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 3.4% | Migrated 3400/100000 records
Vemos que a tarefa continuou sendo executada e que agora tem mais dados processados. Antes tinha 1000 e agora tem 3400.
Now we select option 6
to monitor the task we requested from the server. This will allow us to monitor it for 10 seconds.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 6📋 Available tasks:1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 4.2% | Migrated 4200/100000 recordsSelect a task (1-1) or press Enter to cancel: 1✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1📊 Monitoring task 821a210a-8672-4eba-b27c-500bf63e58c1...[░░░░░░░░░░░░░░░░░░░░] 4.9% | RUNNING | Migrated 4900/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.0% | RUNNING | Migrated 5000/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.1% | RUNNING | Migrated 5100/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.2% | RUNNING | Migrated 5200/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.3% | RUNNING | Migrated 5300/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.4% | RUNNING | Migrated 5400/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.5% | RUNNING | Migrated 5500/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.6% | RUNNING | Migrated 5600/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.7% | RUNNING | Migrated 5700/100000 records[█░░░░░░░░░░░░░░░░░░░] 5.8% | RUNNING | Migrated 5800/100000 recordsTimeout for task 821a210a-8672-4eba-b27c-500bf63e58c1
Finally, we cancel the task using option 7
.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 7📋 Available tasks:1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 6.1% | Migrated 6100/100000 recordsSelect a task (1-1) or press Enter to cancel: 1✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1[08/28/25 12:06:25] INFO Server log: Task logging.py:40821a210a-8672-4eba-b27c-500bf63e58c1cancelled successfully🚫 Task 821a210a-8672-4eba-b27c-500bf63e58c1 cancelled. Status available at: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Now we select option 5
again to see the list of tasks.
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Available options:1. Start data migration2. Start batch processing3. Start ML training4. View server statistics5. View task list6. Monitor specific task7. Cancel task8. ExitSelect an option (1-8): 5Task list:ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | CANCELLED | 6.3% | Migrated 6300/100000 records
Vemos que la tarea que le pedimos al servidor aparece como cancelada (CANCELLED
)