En el anterior post streamable MCP vimos cómo hacer que el servidor MCP envíe la información del proceso que se está realizando, para que el cliente pueda mostrar al usuario esa información.
Esto es útil cuando el proceso que realiza el servidor MCP es largo.
Pero, ¿qué pasa si dicho proceso se interrumpe? Porque se cae el servidor, se pierde la conexión, etc. Tendríamos que volver a pedirle al servidor MCP que vuelva a empezar el proceso.
Así que para que no pase eso, vamos a explicar cómo crear un servidor y un cliente MCP que puedan continuar con el proceso que se está realizando. Así si se cae por la razón que sea, se puede retomar el proceso desde donde se quedó.
Servidor MCP resumable
Este servidor es muy parecido al que hicimos en el post streamable MCP, solo que además creamos checkpoints para poder reanudar un proceso en caso de que se interrumpa.
Así que vamos a ver cómo implementarlo
Implementar servidor MCP resumable
Crear entorno virtual
Primero creamos la carpeta donde lo vamos a desarrollar
!mkdir MCP_resumable_server
Creamos el entorno con uv
!cd MCP_resumable_server && uv init .
Initialized project `mcp-resumable-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_server`
Lo iniciamos
!cd MCP_resumable_server && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos las librerías necesarias
!cd MCP_resumable_server && uv add fastmcp uvicorn
Resolved 64 packages in 548ms⠙ Preparing packages... (0/1) ⠋ Preparing packages... (0/0)⠙ Preparing packages... (0/1)-------------- 0 B/71.28 KiB⠙ Preparing packages... (0/1)-------------- 16.00 KiB/71.28 KiB⠙ Preparing packages... (0/1)-------------- 32.00 KiB/71.28 KiB⠙ Preparing packages... (0/1)---------- 48.00 KiB/71.28 KiB⠙ Preparing packages... (0/1)---------- 64.00 KiB/71.28 KiBPrepared 1 package in 134msInstalled 61 packages in 152ms+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.2+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.22.5+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.2.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.0+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.14.1+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código del servidor
Ahora escribimos el código necesario para el servidor MCP resumable.
Checkpoint manager
Como hemos dicho, la mayor diferencia con el servidor del post anterior streamable MCP es que en este vamos a guardar el estado en checkpoints para poder reanudar el proceso en caso de que se interrumpa. Así que vamos a crear el gestor de checkpoints.
%%writefile MCP_resumable_server/checkpoint_manager.py#!/usr/bin/env python3"""Checkpoint and resumability system for MCP streaming tasks.Allows saving the state of long tasks and resuming them from where they were interrupted."""import jsonimport pickleimport hashlibfrom datetime import datetime, timezonefrom pathlib import Pathfrom typing import Any, Dict, List, Optionalfrom dataclasses import dataclass, asdictfrom enum import Enumclass TaskStatus(Enum):"""Task states."""PENDING = "pending"RUNNING = "running"PAUSED = "paused"COMPLETED = "completed"FAILED = "failed"CANCELLED = "cancelled"@dataclassclass TaskCheckpoint:"""Represents a checkpoint of a task."""task_id: strsession_id: strtask_name: strparameters: Dict[str, Any]current_step: inttotal_steps: intstatus: TaskStatuscreated_at: datetimeupdated_at: datetimedata: Dict[str, Any] # Estado específico de la tareaerror_message: Optional[str] = Nonedef to_dict(self) -> Dict[str, Any]:"""Convert to serializable dictionary."""data = asdict(self)data['status'] = data['status'].valuedata['created_at'] = data['created_at'].isoformat()data['updated_at'] = data['updated_at'].isoformat()return data@classmethoddef from_dict(cls, data: Dict[str, Any]) -> 'TaskCheckpoint':"""Create from dictionary."""data['status'] = TaskStatus(data['status'])data['created_at'] = datetime.fromisoformat(data['created_at'])data['updated_at'] = datetime.fromisoformat(data['updated_at'])return cls(**data)class CheckpointManager:"""Checkpoint manager for streaming tasks."""def __init__(self, storage_dir: str = "checkpoints"):self.storage_dir = Path(storage_dir)self.storage_dir.mkdir(exist_ok=True)self.checkpoints_file = self.storage_dir / "checkpoints.json"self.data_dir = self.storage_dir / "data"self.data_dir.mkdir(exist_ok=True)self._checkpoints: Dict[str, TaskCheckpoint] = {}self._load_checkpoints()def _load_checkpoints(self):"""Load checkpoints from disk."""if self.checkpoints_file.exists():try:with open(self.checkpoints_file, 'r') as f:data = json.load(f)self._checkpoints = {task_id: TaskCheckpoint.from_dict(checkpoint_data)for task_id, checkpoint_data in data.items()}except Exception as e:print(f"❌ Error loading checkpoints: {e}")self._checkpoints = {}def _save_checkpoints(self):"""Save checkpoints to disk."""try:data = {task_id: checkpoint.to_dict()for task_id, checkpoint in self._checkpoints.items()}with open(self.checkpoints_file, 'w') as f:json.dump(data, f, indent=2)except Exception as e:print(f"❌ Error saving checkpoints: {e}")def generate_task_id(self, session_id: str, task_name: str, parameters: Dict[str, Any]) -> str:"""Generate unique ID for a task."""# Create hash based on session, task and parametersdata = f"{session_id}-{task_name}-{json.dumps(parameters, sort_keys=True)}"return hashlib.sha256(data.encode()).hexdigest()[:16]def create_checkpoint(self,session_id: str,task_name: str,parameters: Dict[str, Any],total_steps: int,initial_data: Optional[Dict[str, Any]] = None) -> str:"""Create new checkpoint."""task_id = self.generate_task_id(session_id, task_name, parameters)checkpoint = TaskCheckpoint(task_id=task_id,session_id=session_id,task_name=task_name,parameters=parameters,current_step=0,total_steps=total_steps,status=TaskStatus.PENDING,created_at=datetime.now(timezone.utc),updated_at=datetime.now(timezone.utc),data=initial_data or {})self._checkpoints[task_id] = checkpointself._save_checkpoints()return task_iddef update_checkpoint(self,task_id: str,current_step: int,status: TaskStatus,data: Optional[Dict[str, Any]] = None,error_message: Optional[str] = None) -> bool:"""Update existing checkpoint."""if task_id not in self._checkpoints:return Falsecheckpoint = self._checkpoints[task_id]checkpoint.current_step = current_stepcheckpoint.status = statuscheckpoint.updated_at = datetime.now(timezone.utc)if data is not None:checkpoint.data.update(data)if error_message is not None:checkpoint.error_message = error_messageself._save_checkpoints()return Truedef get_checkpoint(self, task_id: str) -> Optional[TaskCheckpoint]:"""Get checkpoint by ID."""return self._checkpoints.get(task_id)def find_resumable_task(self,session_id: str,task_name: str,parameters: Dict[str, Any]) -> Optional[TaskCheckpoint]:"""Find resumable task with the same parameters."""task_id = self.generate_task_id(session_id, task_name, parameters)checkpoint = self._checkpoints.get(task_id)if checkpoint and checkpoint.status in [TaskStatus.PAUSED, TaskStatus.RUNNING]:return checkpointreturn Nonedef get_session_checkpoints(self, session_id: str) -> List[TaskCheckpoint]:"""Get all checkpoints of a session."""return [checkpoint for checkpoint in self._checkpoints.values()if checkpoint.session_id == session_id]def save_task_data(self, task_id: str, data: Any) -> bool:"""Save specific task data."""try:data_file = self.data_dir / f"{task_id}.pkl"with open(data_file, 'wb') as f:pickle.dump(data, f)return Trueexcept Exception as e:print(f"❌ Error saving task data {task_id}: {e}")return Falsedef load_task_data(self, task_id: str) -> Any:"""Load specific task data."""try:data_file = self.data_dir / f"{task_id}.pkl"if data_file.exists():with open(data_file, 'rb') as f:return pickle.load(f)return Noneexcept Exception as e:print(f"❌ Error loading task data {task_id}: {e}")return Nonedef delete_checkpoint(self, task_id: str) -> bool:"""Delete checkpoint and associated data."""try:# Delete checkpointif task_id in self._checkpoints:del self._checkpoints[task_id]self._save_checkpoints()# Delete datadata_file = self.data_dir / f"{task_id}.pkl"if data_file.exists():data_file.unlink()return Trueexcept Exception as e:print(f"❌ Error deleting checkpoint {task_id}: {e}")return Falsedef cleanup_old_checkpoints(self, max_age_days: int = 7) -> int:"""Clean up old checkpoints."""cutoff_date = datetime.now(timezone.utc).replace(day=datetime.now(timezone.utc).day - max_age_days)to_delete = []for task_id, checkpoint in self._checkpoints.items():if (checkpoint.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]and checkpoint.updated_at < cutoff_date):to_delete.append(task_id)for task_id in to_delete:self.delete_checkpoint(task_id)return len(to_delete)def get_stats(self) -> Dict[str, int]:"""Get checkpoint statistics."""stats = {status.value: 0 for status in TaskStatus}for checkpoint in self._checkpoints.values():stats[checkpoint.status.value] += 1return stats# Singleton global for checkpoint managercheckpoint_manager = CheckpointManager()
Writing MCP_resumable_server/checkpoint_manager.py
Servidor
Ahora creamos el código del servidor
%%writefile MCP_resumable_server/server.py#!/usr/bin/env python3"""Server MCP with resumability and checkpoints.Demo how to implement tasks that can be paused and resumed."""import asyncioimport uvicornimport randomfrom typing import Dict, List, Optional, Anyfrom fastmcp import FastMCP, Contextfrom fastmcp.server.http import create_streamable_http_appfrom checkpoint_manager import checkpoint_manager, TaskStatus, TaskCheckpoint# Create instance of the MCP servermcp = FastMCP(name="Resumable Server",instructions="MCP server with resumability and checkpoints for long tasks")@mcp.toolasync def resumable_data_processing(dataset_name: str = "default_dataset",total_records: int = 100,batch_size: int = 10,client_id: str = None, # Add persistent client_idcontext: Context = None) -> Dict[str, Any]:"""Resumable data processing with automatic checkpoints.Args:dataset_name: Name of the dataset to processtotal_records: Total records to processbatch_size: Size of the batch per iterationclient_id: Persistent client id for resumability"""if not context:return {"error": "Requires context for resumability"}# Use persistent client id if available, otherwise use session_idpersistent_id = client_id if client_id else context.session_idif not persistent_id:return {"error": "Requires client_id or session_id for resumability"}task_name = "resumable_data_processing"parameters = {"dataset_name": dataset_name,"total_records": total_records,"batch_size": batch_size}# Search for existing task to resume - include interrupted taskstask_id = checkpoint_manager.generate_task_id(persistent_id, task_name, parameters)existing_checkpoint = checkpoint_manager.get_checkpoint(task_id)if existing_checkpoint and existing_checkpoint.current_step > 0:# Resume from checkpointawait context.info(f"🔄 Resuming processing from record {existing_checkpoint.current_step}")start_record = existing_checkpoint.current_stepprocessed_data = checkpoint_manager.load_task_data(task_id) or []# Mark as runningcheckpoint_manager.update_checkpoint(task_id, start_record, TaskStatus.RUNNING)else:# New taskawait context.info(f"🆕 Starting new processing of {total_records} records")if not existing_checkpoint:task_id = checkpoint_manager.create_checkpoint(persistent_id, task_name, parameters, total_records)start_record = 0processed_data = []# Mark as runningcheckpoint_manager.update_checkpoint(task_id, 0, TaskStatus.RUNNING)try:# Process in batchesfor record_num in range(start_record, total_records, batch_size):batch_end = min(record_num + batch_size, total_records)# Simulate batch processingawait asyncio.sleep(1) # Simulate work# Process batchbatch_results = []for i in range(record_num, batch_end):# Simulate record processingresult = {"record_id": f"{dataset_name}_{i:06d}","processed_at": f"step_{i}","value": random.randint(1, 1000)}batch_results.append(result)processed_data.extend(batch_results)# Save checkpoint for each batchcheckpoint_manager.update_checkpoint(task_id,batch_end,TaskStatus.RUNNING,{"last_batch": batch_results})# Save processed datacheckpoint_manager.save_task_data(task_id, processed_data)# Report progressawait context.report_progress(progress=batch_end,total=total_records,message=f"Processed {batch_end}/{total_records} records")await context.debug(f"Batch {record_num}-{batch_end-1} completed")# Mark as completedcheckpoint_manager.update_checkpoint(task_id, total_records, TaskStatus.COMPLETED)await context.info(f"✅ Processing completed: {len(processed_data)} records")return {"task_id": task_id,"dataset_name": dataset_name,"total_processed": len(processed_data),"records": processed_data[:5], # Show first 5"status": "completed"}except Exception as e:# Mark as failedcheckpoint_manager.update_checkpoint(task_id, record_num, TaskStatus.FAILED, error_message=str(e))await context.error(f"❌ Error in processing: {e}")raise@mcp.toolasync def pause_task(task_id: str,context: Context = None) -> Dict[str, str]:"""Pause a task in execution.Args:task_id: ID of the task to pause"""checkpoint = checkpoint_manager.get_checkpoint(task_id)if not checkpoint:return {"error": f"Task {task_id} not found"}if checkpoint.status != TaskStatus.RUNNING:return {"error": f"Task {task_id} is not running"}success = checkpoint_manager.update_checkpoint(task_id, checkpoint.current_step, TaskStatus.PAUSED)if success:if context:await context.info(f"⏸️ Task {task_id} paused")return {"message": f"Task {task_id} paused correctly"}else:return {"error": f"Could not pause task {task_id}"}@mcp.toolasync def list_session_tasks(client_id: str = None, # Add persistent client_idcontext: Context = None) -> Dict[str, Any]:"""List all tasks of the current session or the specified client.Args:client_id: Persistent client id (optional)"""if not context:return {"error": "Requires context"}# Use persistent client id if available, otherwise use session_idpersistent_id = client_id if client_id else context.session_idif not persistent_id:return {"error": "Requires client_id or session_id"}checkpoints = checkpoint_manager.get_session_checkpoints(persistent_id)tasks = []for checkpoint in checkpoints:task_info = {"task_id": checkpoint.task_id,"task_name": checkpoint.task_name,"status": checkpoint.status.value,"progress": f"{checkpoint.current_step}/{checkpoint.total_steps}","progress_percent": round((checkpoint.current_step / checkpoint.total_steps) * 100, 1) if checkpoint.total_steps > 0 else 0,"created_at": checkpoint.created_at.isoformat(),"updated_at": checkpoint.updated_at.isoformat()}tasks.append(task_info)return {"session_id": context.session_id,"persistent_id": persistent_id,"total_tasks": len(tasks),"tasks": tasks}@mcp.toolasync def get_checkpoint_stats(context: Context = None) -> Dict[str, Any]:"""Get global checkpoint statistics."""stats = checkpoint_manager.get_stats()return {"checkpoint_stats": stats,"total_checkpoints": sum(stats.values())}@mcp.toolasync def get_session_info(context: Context = None) -> Dict[str, Any]:"""Get information about the current session."""if not context:return {"error": "No context available"}return {"session_id": context.session_id,"request_id": context.request_id,"client_id": context.client_id}async def run_resumable_server(host: str = "127.0.0.1", port: int = 8001):"""Run server with resumability capabilities."""print(f"🚀 Starting MCP server with resumability at {host}:{port}")# Create applicationapp = create_streamable_http_app(server=mcp,streamable_http_path="/mcp/",stateless_http=False, # IMPORTANT: stateful for sessionsdebug=True)config = uvicorn.Config(app=app,host=host,port=port,log_level="info",access_log=False)server = uvicorn.Server(config)print(f"✅ Resumable server ready at http://{host}:{port}/mcp/")print("🔧 Tools with resumability:")print(" - resumable_data_processing: Processing with checkpoints")# print(" - large_file_download: Download resumable")# print(" - machine_learning_training: ML training resumable")print(" - pause_task: Pause tasks")print(" - list_session_tasks: List session tasks")print(" - get_checkpoint_stats: Checkpoint statistics")print(" - get_session_info: Current session information")await server.serve()if __name__ == "__main__":try:asyncio.run(run_resumable_server())except KeyboardInterrupt:print(" ⏹️ Server stopped by user")except Exception as e:print(f"❌ Error running server: {e}")
Writing MCP_resumable_server/server.py
Cliente MCP resumable
Aquí también vamos a tener un cliente muy parecido al del post anterior streamable MCP, pero vamos a añadir un json con la información de las sesiones creadas. Para que si una sesión se interrumpe en medio de un proceso, se pueda continuar desde donde se dejó.
Implementar cliente MCP resumable
Crear el entorno virtual para el cliente
Primero creamos la carpeta donde lo vamos a desarrollar
!mkdir MCP_resumable_client
Creamos el entorno con uv
!cd MCP_resumable_client && uv init .
Initialized project `mcp-resumable-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_client`
Lo iniciamos
!cd MCP_resumable_client && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos las librerías necesarias
!cd MCP_resumable_client && uv add fastmcp
Resolved 64 packages in 314msInstalled 61 packages in 145ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.2+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.22.5+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.2.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.0+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.14.1+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código del cliente
Ahora creamos el cliente MCP. También va a ser muy parecido al del post anterior (es la última vez que digo esto, lo juro) streamable MCP, solo que además guardamos las sesiones en un json
para poder recuperar las que se han quedado a medias en un proceso del servidor
%%writefile MCP_resumable_client/client.py#!/usr/bin/env python3"""Client MCP with resumability and session persistence capabilities.Demonstrates how to connect to a resumable server and handle interruptions."""import asyncioimport jsonimport timeimport uuidfrom typing import Any, Dict, List, Optional, Callablefrom dataclasses import dataclass, fieldfrom datetime import datetimefrom pathlib import Pathfrom fastmcp import Clientfrom fastmcp.client.transports import StreamableHttpTransport@dataclassclass TaskProgress:"""Represents the progress of a task."""task_id: strtask_name: strcurrent_step: inttotal_steps: intlast_message: strstarted_at: datetime = field(default_factory=datetime.now)last_update: datetime = field(default_factory=datetime.now)@propertydef progress_percent(self) -> float:"""Calculate progress percentage."""if self.total_steps == 0:return 0.0return (self.current_step / self.total_steps) * 100@dataclassclass SessionState:"""Session state of the client."""client_id: str # Persistent client IDsession_id: str # Session ID of the server (can change)server_url: stractive_tasks: Dict[str, TaskProgress]completed_tasks: List[str]created_at: datetime = field(default_factory=datetime.now)class ResumableProgressHandler:"""Handles progress with resumability capabilities."""def __init__(self, task_name: str, session_state: SessionState):self.task_name = task_nameself.session_state = session_stateself.start_time = time.time()self.last_progress = 0self.task_progress: Optional[TaskProgress] = Nonedef __call__(self, progress: float, total: float, message: str):"""Callback for progress updates."""# Create or update task progressif not self.task_progress:# Try to find task_id from the messagetask_id = self._extract_task_id(message)if not task_id:task_id = f"task_{int(time.time())}"self.task_progress = TaskProgress(task_id=task_id,task_name=self.task_name,current_step=int(progress),total_steps=int(total),last_message=message)self.session_state.active_tasks[task_id] = self.task_progresselse:self.task_progress.current_step = int(progress)self.task_progress.total_steps = int(total)self.task_progress.last_message = messageself.task_progress.last_update = datetime.now()# Display progress visuallyself._display_progress(progress, total, message)# Save session stateself._save_session_state()def _extract_task_id(self, message: str) -> Optional[str]:"""Try to extract task_id from the message."""# Search for patterns like "task_id: abc123" in the messageimport rematch = re.search(r'task[_s]*id[:s]*([a-f0-9]{16})', message.lower())return match.group(1) if match else Nonedef _display_progress(self, progress: float, total: float, message: str):"""Display progress visually."""percentage = (progress / total) * 100 if total > 0 else 0bar_length = 30filled_length = int(bar_length * percentage / 100)bar = '█' * filled_length + '░' * (bar_length - filled_length)elapsed = time.time() - self.start_timeprint(f" 📊 {self.task_name}: |{bar}| {percentage:.1f}% "f"({progress:.0f}/{total:.0f}) - {message} [{elapsed:.1f}s]",end='', flush=True)if progress >= total:print() # New line when completedif self.task_progress:# Move to completedtask_id = self.task_progress.task_idself.session_state.completed_tasks.append(task_id)if task_id in self.session_state.active_tasks:del self.session_state.active_tasks[task_id]self._save_session_state()def _save_session_state(self):"""Save session state."""# In a real case, this would be saved to diskpassclass MCPResumableClient:"""Client MCP with resumability capabilities."""def __init__(self, server_url: str = "http://localhost:8001/mcp/"):self.server_url = server_urlself.transport = Noneself.client = Noneself.session_state: Optional[SessionState] = Noneself.state_file = Path("session_state.json")async def __aenter__(self):"""Initialize connection and load state."""# Load previous state if it existsself._load_session_state()self.transport = StreamableHttpTransport(url=self.server_url,sse_read_timeout=120.0 # Timeout for resumable tasks)self.client = Client(transport=self.transport)await self.client.__aenter__()# Create new session state if it doesn't existif not self.session_state:# Generate unique persistent client IDclient_id = str(uuid.uuid4())[:16] # Use only the first 16 characters# Get real session_id from the servertry:session_info_result = await self.client.call_tool("get_session_info", {})# Extract content from CallToolResultif hasattr(session_info_result, 'content') and session_info_result.content:session_info_text = session_info_result.content[0].textif isinstance(session_info_text, str):session_info = json.loads(session_info_text)else:session_info = session_info_textelse:session_info = {}real_session_id = session_info.get("session_id", f"session_{int(time.time())}")print("🏗️ Creating new session")print(f"🔗 Session ID of the server: {real_session_id}")print(f"🆔 Persistent Client ID: {client_id}")except Exception as e:print(f"⚠️ Could not get session_id from the server: {e}")real_session_id = f"session_{int(time.time())}"self.session_state = SessionState(client_id=client_id,session_id=real_session_id,server_url=self.server_url,active_tasks={},completed_tasks=[])self._save_session_state()else:# Reuse existing client_idprint(f"🆔 Reusing Persistent Client ID: {self.session_state.client_id}")# Verify if the session_id matches the servertry:session_info_result = await self.client.call_tool("get_session_info", {})# Extract content from CallToolResultif hasattr(session_info_result, 'content') and session_info_result.content:session_info_text = session_info_result.content[0].textif isinstance(session_info_text, str):session_info = json.loads(session_info_text)else:session_info = session_info_textelse:session_info = {}server_session_id = session_info.get("session_id")if server_session_id and server_session_id != self.session_state.session_id:print(f"⚠️ Session ID of the server changed: {self.session_state.session_id} → {server_session_id}")self.session_state.session_id = server_session_idself._save_session_state()except Exception as e:print(f"⚠️ Could not verify session_id: {e}")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""Close connection and save state."""if self.client:await self.client.__aexit__(exc_type, exc_val, exc_tb)self._save_session_state()def _load_session_state(self):"""Load session state from disk."""if self.state_file.exists():try:with open(self.state_file, 'r') as f:data = json.load(f)# Reconstruct objectsactive_tasks = {}for task_id, task_data in data.get('active_tasks', {}).items():task_data['started_at'] = datetime.fromisoformat(task_data['started_at'])task_data['last_update'] = datetime.fromisoformat(task_data['last_update'])active_tasks[task_id] = TaskProgress(**task_data)self.session_state = SessionState(client_id=data.get('client_id', str(uuid.uuid4())[:16]), # Generate if not existssession_id=data['session_id'],server_url=data['server_url'],active_tasks=active_tasks,completed_tasks=data.get('completed_tasks', []),created_at=datetime.fromisoformat(data['created_at']))print(f"📂 Session state loaded: {self.session_state.session_id}")except Exception as e:print(f"⚠️ Error loading session state: {e}")self.session_state = Noneelse:print("💬 No session state found, starting new session")self.session_state = Nonedef _save_session_state(self):"""Save session state to disk."""if not self.session_state:returntry:# Convert to serializable formatactive_tasks = {}for task_id, task_progress in self.session_state.active_tasks.items():active_tasks[task_id] = {'task_id': task_progress.task_id,'task_name': task_progress.task_name,'current_step': task_progress.current_step,'total_steps': task_progress.total_steps,'last_message': task_progress.last_message,'started_at': task_progress.started_at.isoformat(),'last_update': task_progress.last_update.isoformat()}data = {'client_id': self.session_state.client_id,'session_id': self.session_state.session_id,'server_url': self.session_state.server_url,'active_tasks': active_tasks,'completed_tasks': self.session_state.completed_tasks,'created_at': self.session_state.created_at.isoformat()}with open(self.state_file, 'w') as f:json.dump(data, f, indent=2)except Exception as e:print(f"⚠️ Error saving session state: {e}")async def test_connection(self) -> bool:"""Test connection to the server."""try:await self.client.ping()print(f"✅ Connection established with the resumable server")return Trueexcept Exception as e:print(f"❌ Connection error: {e}")return Falseasync def call_resumable_tool(self,tool_name: str,parameters: Dict[str, Any],allow_resume: bool = True) -> Dict[str, Any]:"""Call tool with resumability capabilities."""progress_handler = ResumableProgressHandler(tool_name, self.session_state)start_time = time.time()# Add client_id to the parameters if allow_resume is enabledif allow_resume and self.session_state:parameters = {**parameters, 'client_id': self.session_state.client_id}try:print(f"🚀 Executing {tool_name} (resumption: {'✅' if allow_resume else '❌'})")result = await self.client.call_tool(tool_name,parameters,progress_handler=progress_handler)duration = time.time() - start_timeprint(f"✅ {tool_name} completed in {duration:.2f}s")# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except KeyboardInterrupt:print(f" ⏸️ {tool_name} interrupted by the user")# In a real implementation, here we would pause the taskraiseexcept Exception as e:duration = time.time() - start_timeprint(f"❌ {tool_name} failed after {duration:.2f}s: {e}")raiseasync def list_session_tasks(self) -> Dict[str, Any]:"""List session tasks."""try:parameters = {}if self.session_state:parameters['client_id'] = self.session_state.client_idresult = await self.client.call_tool("list_session_tasks", parameters)# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except Exception as e:print(f"❌ Error listing session tasks: {e}")return {"error": str(e)}async def pause_task(self, task_id: str) -> Dict[str, Any]:"""Pause a specific task."""try:result = await self.client.call_tool("pause_task", {"task_id": task_id})# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except Exception as e:print(f"❌ Error pausing task {task_id}: {e}")return {"error": str(e)}async def get_checkpoint_stats(self) -> Dict[str, Any]:"""Get checkpoint statistics."""try:result = await self.client.call_tool("get_checkpoint_stats", {})# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except Exception as e:print(f"❌ Error getting checkpoint statistics: {e}")return {"error": str(e)}def show_session_summary(self):"""Show session summary."""if not self.session_state:print("❌ No session state available")returnprint(" " + "="*60)print("📋 SESSION SUMMARY")print("="*60)print(f"🆔 Client ID (persistent): {self.session_state.client_id}")print(f"📌 Session ID (server): {self.session_state.session_id}")print(f"🔗 Server: {self.session_state.server_url}")print(f"📅 Created: {self.session_state.created_at.strftime('%Y-%m-%d %H:%M:%S')}")print(f" 🔄 Active Tasks: {len(self.session_state.active_tasks)}")for task_id, task in self.session_state.active_tasks.items():progress = task.progress_percentprint(f" • {task.task_name} ({task_id[:8]}...): {progress:.1f}%")print(f" └─ {task.last_message}")print(f" ✅ Completed Tasks: {len(self.session_state.completed_tasks)}")for task_id in self.session_state.completed_tasks[-5:]: # Last 5print(f" • {task_id[:8]}...")async def demo_resumable_data_processing(client: MCPResumableClient):"""Demo of resumable data processing."""print(" " + "="*60)print("📊 DEMO: Resumable Data Processing")print("="*60)result = await client.call_resumable_tool("resumable_data_processing",{"dataset_name": "customer_data","total_records": 50,"batch_size": 5})if "error" not in result:print(f"📊 Result: {result.get('total_processed', 0)} records processed")print(f"🆔 Task ID: {result.get('task_id', 'N/A')}")async def interactive_demo(client: MCPResumableClient):"""Interactive demo with options."""while True:print(" " + "="*60)print("🎮 INTERACTIVE DEMO - Resumable Client")print("="*60)print("1. Resumable data processing")print("2. List session tasks")print("3. Checkpoint statistics")print("4. Session summary")print("0. Exit")print("-" * 60)try:choice = input("Select an option (0-4): ").strip()if choice == "0":breakelif choice == "1":await demo_resumable_data_processing(client)elif choice == "2":tasks = await client.list_session_tasks()print(f"📋 Session tasks: {json.dumps(tasks, indent=2)}")elif choice == "3":stats = await client.get_checkpoint_stats()print(f"📊 Checkpoint statistics: {json.dumps(stats, indent=2)}")elif choice == "4":client.show_session_summary()else:print("❌ Invalid option")except KeyboardInterrupt:print(" ⏸️ Demo interrupted")breakexcept Exception as e:print(f"❌ Error: {e}")async def run_resumable_demo():"""Run resumable demo."""print("🌟 Resumable Client MCP - Demo")print("="*60)try:async with MCPResumableClient() as client:# Test connectionif not await client.test_connection():print("❌ Could not connect to the resumable server.")print(" Make sure the server is running on port 8001")return# Show session stateclient.show_session_summary()# Run interactive demoawait interactive_demo(client)print(" 🎉 Demo completed")except Exception as e:print(f"❌ Error in the demo: {e}")if __name__ == "__main__":try:asyncio.run(run_resumable_demo())except KeyboardInterrupt:print(" ⏹️ Demo interrupted by the user")except Exception as e:print(f"❌ Error running demo: {e}")
Overwriting MCP_resumable_client/client.py
Ejecución
Vamos a levantar el servidor, iniciar el cliente, pedirle al servidor una tarea, vamos a parar el cliente y el servidor para simular que la conexión se ha perdido y luego vamos a volver a iniciar el servidor y el cliente para retomar la tarea dónde la dejamos
Primera ejecución - proceso interrumpido
Antes de ejecutar nada vamos a ver el contenido de las carpetas del servidor y del cliente
Vemos qué hay en la carpeta del servidor
!cd MCP_resumable_server && ls -lha
total 336drwxr-xr-x@ 11 macm1 staff 352B Aug 25 10:26 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.mddrwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock
Tenemos los archivos .python-version
, .venv
, README.md
, __pycache__
, main.py
, pyproject.toml
y uv.lock
que son los archivos que ha creado uv
al crear el entorno virtual. Y tenemos los archivos checkpoint_manager.py
y server.py
que son los archivos que hemos creado para el servidor MCP resumable.
Ahora vemos qué archivos hay en la carpeta del cliente
!cd MCP_resumable_client && ls -lha
total 336drwxr-xr-x@ 9 macm1 staff 288B Aug 25 10:26 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock
Igual que antes, tenemos los archivos .python-version
, .venv
, README.md
, main.py
, pyproject.toml
y uv.lock
que son los archivos que ha creado uv
al crear el entorno virtual. Y tenemos el archivo client.py
que es el archivo que hemos creado para el cliente MCP resumable.
Una vez visto esto, levantamos el servidor
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
🚀 Starting MCP server with resumability at 127.0.0.1:8001✅ Resumable server ready at http://127.0.0.1:8001/mcp/🔧 Tools with resumability:- resumable_data_processing: Processing with checkpoints- pause_task: Pause tasks- list_session_tasks: List session tasks- get_checkpoint_stats: Checkpoint statistics- get_session_info: Current session informationINFO: Started server process [47049]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)
Ahora ejecutamos el cliente, seleccionamos la opción 1 (Resumable data processing) y lo paramos a medias
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
🌟 Resumable Client MCP - Demo============================================================💬 No session state found, starting new session🏗️ Creating new session🔗 Session ID of the server: 6bf64139282e41e0a2233ec92647bf02🆔 Persistent Client ID: f0af843a-9b0e-42✅ Connection established with the resumable server============================================================📋 SESSION SUMMARY============================================================🆔 Client ID (persistent): f0af843a-9b0e-42📌 Session ID (server): 6bf64139282e41e0a2233ec92647bf02🔗 Server: http://localhost:8001/mcp/📅 Created: 2025-08-25 10:34:53🔄 Active Tasks: 0✅ Completed Tasks: 0============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4): 1============================================================📊 DEMO: Resumable Data Processing============================================================🚀 Executing resumable_data_processing (resumption: ✅)[08/25/25 10:35:19] INFO Server log: logging.py:40🆕 Starting new processing of 50 records📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]^C⏹️ Demo interrupted by the user
Paramos también el servidor
Ahora que hemos parado el servidor y el cliente, volvemos a ver qué hay en cada carpeta
Primero vemos qué hay en la carpeta del servidor
!cd MCP_resumable_server && ls -lha
total 336drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.mddrwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.pydrwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 checkpoints-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock
Podemos ver que además de todo lo que había antes, ahora hay tambén una carpet llamada checkpoints
que contiene los checkpoints de las tareas que se han ejecutado.
!cd MCP_resumable_server/checkpoints && ls -lha
total 8drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 .drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 ..-rw-r--r--@ 1 macm1 staff 1.1K Aug 25 10:35 checkpoints.jsondrwxr-xr-x@ 3 macm1 staff 96B Aug 25 10:35 data
Si vemos el contenido del json
podemos ver
"current_step": 15,
"total_steps": 50,
!cd MCP_resumable_server/checkpoints && cat checkpoints.json
{"9d7bc504c50e0bce": {"task_id": "9d7bc504c50e0bce","session_id": "864eec3a-be1b-43","task_name": "resumable_data_processing","parameters": {"dataset_name": "customer_data","total_records": 50,"batch_size": 5},"current_step": 15,"total_steps": 50,"status": "failed","created_at": "2025-08-25T08:35:19.349142+00:00","updated_at": "2025-08-25T08:35:23.366672+00:00","data": {"last_batch": [{"record_id": "customer_data_000015","processed_at": "step_15","value": 88},{"record_id": "customer_data_000016","processed_at": "step_16","value": 17},{"record_id": "customer_data_000017","processed_at": "step_17","value": 596},{"record_id": "customer_data_000018","processed_at": "step_18","value": 693},{"record_id": "customer_data_000019","processed_at": "step_19","value": 34}]},"error_message": ""}}
Es decir, nos hemos quedado en el paso 15 de 50, que si te fijas en la salida del cliente, es dónde hemos interrumpido el proceso.
📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C
Ahora vemos el contenido de la carpeta del cliente, podemos ver que ahora hay un archivo llamado session_state.json
!cd MCP_resumable_client && ls -lha
total 344drwxr-xr-x@ 10 macm1 staff 320B Aug 25 10:35 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml-rw-r--r--@ 1 macm1 staff 546B Aug 25 10:35 session_state.json-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock
Vamos a ver qué tiene dentro
!cd MCP_resumable_client && cat session_state.json
{"client_id": "864eec3a-be1b-43","session_id": "d34ff655d6464fb4ae5eeb64c62989b0","server_url": "http://localhost:8001/mcp/","active_tasks": {"task_1756110920": {"task_id": "task_1756110920","task_name": "resumable_data_processing","current_step": 15,"total_steps": 50,"last_message": "Processed 15/50 records","started_at": "2025-08-25T10:35:20.353521","last_update": "2025-08-25T10:35:22.364507"}},"completed_tasks": [],"created_at": "2025-08-25T10:35:16.230926"}
Al igual que en el json
del servidor vemos
"current_step": 15,
"total_steps": 50,
También seha quedado guardado que nos hemos quedado en el paso 15 de 50, pero lo más importante es
"task_id": "task_1756110920",
En donde guardamos el ID de la tarea para poder pedirle al servidor que la retome
Segunda ejecución - proceso reanudado
Si ahora volvemos a levantar el servidor
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
🚀 Starting MCP server with resumability at 127.0.0.1:8001✅ Resumable server ready at http://127.0.0.1:8001/mcp/🔧 Tools with resumability:- resumable_data_processing: Processing with checkpoints- pause_task: Pause tasks- list_session_tasks: List session tasks- get_checkpoint_stats: Checkpoint statistics- get_session_info: Current session informationINFO: Started server process [52338]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)
Y volvemos a ejecutar el cliente
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
🌟 Resumable Client MCP - Demo============================================================📂 Session state loaded: d34ff655d6464fb4ae5eeb64c62989b0🆔 Reusing Persistent Client ID: 864eec3a-be1b-43⚠️ Session ID of the server changed: d34ff655d6464fb4ae5eeb64c62989b0 → 004bc91edde144898938136d0d2b5041✅ Connection established with the resumable server============================================================📋 SESSION SUMMARY============================================================🆔 Client ID (persistent): 864eec3a-be1b-43📌 Session ID (server): 004bc91edde144898938136d0d2b5041🔗 Server: http://localhost:8001/mcp/📅 Created: 2025-08-25 10:35:16🔄 Active Tasks: 1• resumable_data_processing (task_175...): 30.0%└─ Processed 15/50 records✅ Completed Tasks: 0============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4):
Podemos ver que dice
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
Ha podido recuperar la información de que nos quedamos en el paso 15 de 50 de la tarea con ID 1756110920
Ahora volvemos a seleccionar la opción 1 y veremos que continua desde el paso 15
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
🌟 Resumable Client MCP - Demo============================================================📂 Session state loaded: 004bc91edde144898938136d0d2b5041🆔 Reusing Persistent Client ID: 864eec3a-be1b-43⚠️ Session ID of the server changed: 004bc91edde144898938136d0d2b5041 → 4b93c36446504a3d92467d39a8cba589✅ Connection established with the resumable server============================================================📋 SESSION SUMMARY============================================================🆔 Client ID (persistent): 864eec3a-be1b-43📌 Session ID (server): 4b93c36446504a3d92467d39a8cba589🔗 Server: http://localhost:8001/mcp/📅 Created: 2025-08-25 10:35:16🔄 Active Tasks: 1• resumable_data_processing (task_175...): 30.0%└─ Processed 15/50 records✅ Completed Tasks: 0============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4): 1============================================================📊 DEMO: Resumable Data Processing============================================================🚀 Executing resumable_data_processing (resumption: ✅)[08/25/25 10:57:31] INFO Server log: 🔄 Resuming logging.py:40processing from record 15📊 resumable_data_processing: |████████████░░░░░░░░░░░░░░░░░░| 40.0% (20/50) - Processed 20/50 records [1.0s]📊 resumable_data_processing: |███████████████░░░░░░░░░░░░░░░| 50.0% (25/50) - Processed 25/50 records [2.0s]📊 resumable_data_processing: |██████████████████░░░░░░░░░░░░| 60.0% (30/50) - Processed 30/50 records [3.0s]📊 resumable_data_processing: |█████████████████████░░░░░░░░░| 70.0% (35/50) - Processed 35/50 records [4.0s]📊 resumable_data_processing: |████████████████████████░░░░░░| 80.0% (40/50) - Processed 40/50 records [5.0s]📊 resumable_data_processing: |███████████████████████████░░░| 90.0% (45/50) - Processed 45/50 records [6.0s]📊 resumable_data_processing: |██████████████████████████████| 100.0% (50/50) - Processed 50/50 records [7.0s][08/25/25 10:57:38] INFO Server log: ✅ Processing logging.py:40completed: 55 records✅ resumable_data_processing completed in 7.05s📊 Result: 55 records processed🆔 Task ID: 88e68f3bdf65cd22============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4): 0🎉 Demo completed
Vemos que pone processing from record 15
, es decir, ha empezado por donde se quedó
De esta manera ya podemos tener servidores y clientes MCP que procesen tareas largas y que se puedan retomar en caso de que se interrumpan.
Esto también es muy útil para cuando la tarea es muy larga, que no haga falta tener el cliente siempre ejecutándose y esperando, sino que podemos hacer que el cliente le pida al servidor la tarea y cada cierto tiempo le pida el estado de la tarea