No post anterior streamable MCP, vimos como fazer com que o servidor MCP envie as informações do processo que está sendo realizado, para que o cliente possa mostrar essas informações ao usuário.
Isso é útil quando o processo realizado pelo servidor MCP é longo.
Mas, e se esse processo for interrompido? Por causa de uma queda do servidor, perda de conexão, etc. Teríamos que pedir novamente ao servidor MCP para reiniciar o processo.
Para evitar que isso aconteça, vamos explicar como criar um servidor e um cliente MCP que possam continuar com o processo que está sendo realizado. Assim, se ele cair por qualquer motivo, será possível retomar o processo de onde parou.
Servidor MCP resumível
Este servidor é muito semelhante ao que criamos na publicação streamable MCP, só que também criamos pontos de verificação para poder retomar um processo caso ele seja interrompido.
Então, vamos ver como implementá-lo.
Implementar servidor MCP resumable
Criar ambiente virtual
Primeiro criamos a pasta onde vamos desenvolvê-lo.
!mkdir MCP_resumable_server
Criamos o ambiente com uv
!cd MCP_resumable_server && uv init .
Initialized project `mcp-resumable-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_server`
Iniciamos o ambiente
!cd MCP_resumable_server && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos as bibliotecas necessárias
!cd MCP_resumable_server && uv add fastmcp uvicorn
Resolved 64 packages in 548ms⠙ Preparing packages... (0/1) ⠋ Preparing packages... (0/0)⠙ Preparing packages... (0/1)-------------- 0 B/71.28 KiB⠙ Preparing packages... (0/1)-------------- 16.00 KiB/71.28 KiB⠙ Preparing packages... (0/1)-------------- 32.00 KiB/71.28 KiB⠙ Preparing packages... (0/1)---------- 48.00 KiB/71.28 KiB⠙ Preparing packages... (0/1)---------- 64.00 KiB/71.28 KiBPrepared 1 package in 134msInstalled 61 packages in 152ms+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.2+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.22.5+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.2.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.0+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.14.1+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código do servidor
Agora vamos escrever o código necessário para o servidor MCP resumable.
Checkpoint manager
Como já dissemos, a maior diferença em relação ao servidor da publicação anterior streamable MCP é que, neste, vamos guardar o estado em pontos de verificação para poder retomar o processo caso ele seja interrompido. Então, vamos criar o gerenciador de pontos de verificação.
%%writefile MCP_resumable_server/checkpoint_manager.py#!/usr/bin/env python3"""Checkpoint and resumability system for MCP streaming tasks.Allows saving the state of long tasks and resuming them from where they were interrupted."""import jsonimport pickleimport hashlibfrom datetime import datetime, timezonefrom pathlib import Pathfrom typing import Any, Dict, List, Optionalfrom dataclasses import dataclass, asdictfrom enum import Enumclass TaskStatus(Enum):"""Task states."""PENDING = "pending"RUNNING = "running"PAUSED = "paused"COMPLETED = "completed"FAILED = "failed"CANCELLED = "cancelled"@dataclassclass TaskCheckpoint:"""Represents a checkpoint of a task."""task_id: strsession_id: strtask_name: strparameters: Dict[str, Any]current_step: inttotal_steps: intstatus: TaskStatuscreated_at: datetimeupdated_at: datetimedata: Dict[str, Any] # Estado específico de la tareaerror_message: Optional[str] = Nonedef to_dict(self) -> Dict[str, Any]:"""Convert to serializable dictionary."""data = asdict(self)data['status'] = data['status'].valuedata['created_at'] = data['created_at'].isoformat()data['updated_at'] = data['updated_at'].isoformat()return data@classmethoddef from_dict(cls, data: Dict[str, Any]) -> 'TaskCheckpoint':"""Create from dictionary."""data['status'] = TaskStatus(data['status'])data['created_at'] = datetime.fromisoformat(data['created_at'])data['updated_at'] = datetime.fromisoformat(data['updated_at'])return cls(**data)class CheckpointManager:"""Checkpoint manager for streaming tasks."""def __init__(self, storage_dir: str = "checkpoints"):self.storage_dir = Path(storage_dir)self.storage_dir.mkdir(exist_ok=True)self.checkpoints_file = self.storage_dir / "checkpoints.json"self.data_dir = self.storage_dir / "data"self.data_dir.mkdir(exist_ok=True)self._checkpoints: Dict[str, TaskCheckpoint] = {}self._load_checkpoints()def _load_checkpoints(self):"""Load checkpoints from disk."""if self.checkpoints_file.exists():try:with open(self.checkpoints_file, 'r') as f:data = json.load(f)self._checkpoints = {task_id: TaskCheckpoint.from_dict(checkpoint_data)for task_id, checkpoint_data in data.items()}except Exception as e:print(f"❌ Error loading checkpoints: {e}")self._checkpoints = {}def _save_checkpoints(self):"""Save checkpoints to disk."""try:data = {task_id: checkpoint.to_dict()for task_id, checkpoint in self._checkpoints.items()}with open(self.checkpoints_file, 'w') as f:json.dump(data, f, indent=2)except Exception as e:print(f"❌ Error saving checkpoints: {e}")def generate_task_id(self, session_id: str, task_name: str, parameters: Dict[str, Any]) -> str:"""Generate unique ID for a task."""# Create hash based on session, task and parametersdata = f"{session_id}-{task_name}-{json.dumps(parameters, sort_keys=True)}"return hashlib.sha256(data.encode()).hexdigest()[:16]def create_checkpoint(self,session_id: str,task_name: str,parameters: Dict[str, Any],total_steps: int,initial_data: Optional[Dict[str, Any]] = None) -> str:"""Create new checkpoint."""task_id = self.generate_task_id(session_id, task_name, parameters)checkpoint = TaskCheckpoint(task_id=task_id,session_id=session_id,task_name=task_name,parameters=parameters,current_step=0,total_steps=total_steps,status=TaskStatus.PENDING,created_at=datetime.now(timezone.utc),updated_at=datetime.now(timezone.utc),data=initial_data or {})self._checkpoints[task_id] = checkpointself._save_checkpoints()return task_iddef update_checkpoint(self,task_id: str,current_step: int,status: TaskStatus,data: Optional[Dict[str, Any]] = None,error_message: Optional[str] = None) -> bool:"""Update existing checkpoint."""if task_id not in self._checkpoints:return Falsecheckpoint = self._checkpoints[task_id]checkpoint.current_step = current_stepcheckpoint.status = statuscheckpoint.updated_at = datetime.now(timezone.utc)if data is not None:checkpoint.data.update(data)if error_message is not None:checkpoint.error_message = error_messageself._save_checkpoints()return Truedef get_checkpoint(self, task_id: str) -> Optional[TaskCheckpoint]:"""Get checkpoint by ID."""return self._checkpoints.get(task_id)def find_resumable_task(self,session_id: str,task_name: str,parameters: Dict[str, Any]) -> Optional[TaskCheckpoint]:"""Find resumable task with the same parameters."""task_id = self.generate_task_id(session_id, task_name, parameters)checkpoint = self._checkpoints.get(task_id)if checkpoint and checkpoint.status in [TaskStatus.PAUSED, TaskStatus.RUNNING]:return checkpointreturn Nonedef get_session_checkpoints(self, session_id: str) -> List[TaskCheckpoint]:"""Get all checkpoints of a session."""return [checkpoint for checkpoint in self._checkpoints.values()if checkpoint.session_id == session_id]def save_task_data(self, task_id: str, data: Any) -> bool:"""Save specific task data."""try:data_file = self.data_dir / f"{task_id}.pkl"with open(data_file, 'wb') as f:pickle.dump(data, f)return Trueexcept Exception as e:print(f"❌ Error saving task data {task_id}: {e}")return Falsedef load_task_data(self, task_id: str) -> Any:"""Load specific task data."""try:data_file = self.data_dir / f"{task_id}.pkl"if data_file.exists():with open(data_file, 'rb') as f:return pickle.load(f)return Noneexcept Exception as e:print(f"❌ Error loading task data {task_id}: {e}")return Nonedef delete_checkpoint(self, task_id: str) -> bool:"""Delete checkpoint and associated data."""try:# Delete checkpointif task_id in self._checkpoints:del self._checkpoints[task_id]self._save_checkpoints()# Delete datadata_file = self.data_dir / f"{task_id}.pkl"if data_file.exists():data_file.unlink()return Trueexcept Exception as e:print(f"❌ Error deleting checkpoint {task_id}: {e}")return Falsedef cleanup_old_checkpoints(self, max_age_days: int = 7) -> int:"""Clean up old checkpoints."""cutoff_date = datetime.now(timezone.utc).replace(day=datetime.now(timezone.utc).day - max_age_days)to_delete = []for task_id, checkpoint in self._checkpoints.items():if (checkpoint.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]and checkpoint.updated_at < cutoff_date):to_delete.append(task_id)for task_id in to_delete:self.delete_checkpoint(task_id)return len(to_delete)def get_stats(self) -> Dict[str, int]:"""Get checkpoint statistics."""stats = {status.value: 0 for status in TaskStatus}for checkpoint in self._checkpoints.values():stats[checkpoint.status.value] += 1return stats# Singleton global for checkpoint managercheckpoint_manager = CheckpointManager()
Writing MCP_resumable_server/checkpoint_manager.py
Servidor
Agora vamos criar o código do servidor.
%%writefile MCP_resumable_server/server.py#!/usr/bin/env python3"""Server MCP with resumability and checkpoints.Demo how to implement tasks that can be paused and resumed."""import asyncioimport uvicornimport randomfrom typing import Dict, List, Optional, Anyfrom fastmcp import FastMCP, Contextfrom fastmcp.server.http import create_streamable_http_appfrom checkpoint_manager import checkpoint_manager, TaskStatus, TaskCheckpoint# Create instance of the MCP servermcp = FastMCP(name="Resumable Server",instructions="MCP server with resumability and checkpoints for long tasks")@mcp.toolasync def resumable_data_processing(dataset_name: str = "default_dataset",total_records: int = 100,batch_size: int = 10,client_id: str = None, # Add persistent client_idcontext: Context = None) -> Dict[str, Any]:"""Resumable data processing with automatic checkpoints.Args:dataset_name: Name of the dataset to processtotal_records: Total records to processbatch_size: Size of the batch per iterationclient_id: Persistent client id for resumability"""if not context:return {"error": "Requires context for resumability"}# Use persistent client id if available, otherwise use session_idpersistent_id = client_id if client_id else context.session_idif not persistent_id:return {"error": "Requires client_id or session_id for resumability"}task_name = "resumable_data_processing"parameters = {"dataset_name": dataset_name,"total_records": total_records,"batch_size": batch_size}# Search for existing task to resume - include interrupted taskstask_id = checkpoint_manager.generate_task_id(persistent_id, task_name, parameters)existing_checkpoint = checkpoint_manager.get_checkpoint(task_id)if existing_checkpoint and existing_checkpoint.current_step > 0:# Resume from checkpointawait context.info(f"🔄 Resuming processing from record {existing_checkpoint.current_step}")start_record = existing_checkpoint.current_stepprocessed_data = checkpoint_manager.load_task_data(task_id) or []# Mark as runningcheckpoint_manager.update_checkpoint(task_id, start_record, TaskStatus.RUNNING)else:# New taskawait context.info(f"🆕 Starting new processing of {total_records} records")if not existing_checkpoint:task_id = checkpoint_manager.create_checkpoint(persistent_id, task_name, parameters, total_records)start_record = 0processed_data = []# Mark as runningcheckpoint_manager.update_checkpoint(task_id, 0, TaskStatus.RUNNING)try:# Process in batchesfor record_num in range(start_record, total_records, batch_size):batch_end = min(record_num + batch_size, total_records)# Simulate batch processingawait asyncio.sleep(1) # Simulate work# Process batchbatch_results = []for i in range(record_num, batch_end):# Simulate record processingresult = {"record_id": f"{dataset_name}_{i:06d}","processed_at": f"step_{i}","value": random.randint(1, 1000)}batch_results.append(result)processed_data.extend(batch_results)# Save checkpoint for each batchcheckpoint_manager.update_checkpoint(task_id,batch_end,TaskStatus.RUNNING,{"last_batch": batch_results})# Save processed datacheckpoint_manager.save_task_data(task_id, processed_data)# Report progressawait context.report_progress(progress=batch_end,total=total_records,message=f"Processed {batch_end}/{total_records} records")await context.debug(f"Batch {record_num}-{batch_end-1} completed")# Mark as completedcheckpoint_manager.update_checkpoint(task_id, total_records, TaskStatus.COMPLETED)await context.info(f"✅ Processing completed: {len(processed_data)} records")return {"task_id": task_id,"dataset_name": dataset_name,"total_processed": len(processed_data),"records": processed_data[:5], # Show first 5"status": "completed"}except Exception as e:# Mark as failedcheckpoint_manager.update_checkpoint(task_id, record_num, TaskStatus.FAILED, error_message=str(e))await context.error(f"❌ Error in processing: {e}")raise@mcp.toolasync def pause_task(task_id: str,context: Context = None) -> Dict[str, str]:"""Pause a task in execution.Args:task_id: ID of the task to pause"""checkpoint = checkpoint_manager.get_checkpoint(task_id)if not checkpoint:return {"error": f"Task {task_id} not found"}if checkpoint.status != TaskStatus.RUNNING:return {"error": f"Task {task_id} is not running"}success = checkpoint_manager.update_checkpoint(task_id, checkpoint.current_step, TaskStatus.PAUSED)if success:if context:await context.info(f"⏸️ Task {task_id} paused")return {"message": f"Task {task_id} paused correctly"}else:return {"error": f"Could not pause task {task_id}"}@mcp.toolasync def list_session_tasks(client_id: str = None, # Add persistent client_idcontext: Context = None) -> Dict[str, Any]:"""List all tasks of the current session or the specified client.Args:client_id: Persistent client id (optional)"""if not context:return {"error": "Requires context"}# Use persistent client id if available, otherwise use session_idpersistent_id = client_id if client_id else context.session_idif not persistent_id:return {"error": "Requires client_id or session_id"}checkpoints = checkpoint_manager.get_session_checkpoints(persistent_id)tasks = []for checkpoint in checkpoints:task_info = {"task_id": checkpoint.task_id,"task_name": checkpoint.task_name,"status": checkpoint.status.value,"progress": f"{checkpoint.current_step}/{checkpoint.total_steps}","progress_percent": round((checkpoint.current_step / checkpoint.total_steps) * 100, 1) if checkpoint.total_steps > 0 else 0,"created_at": checkpoint.created_at.isoformat(),"updated_at": checkpoint.updated_at.isoformat()}tasks.append(task_info)return {"session_id": context.session_id,"persistent_id": persistent_id,"total_tasks": len(tasks),"tasks": tasks}@mcp.toolasync def get_checkpoint_stats(context: Context = None) -> Dict[str, Any]:"""Get global checkpoint statistics."""stats = checkpoint_manager.get_stats()return {"checkpoint_stats": stats,"total_checkpoints": sum(stats.values())}@mcp.toolasync def get_session_info(context: Context = None) -> Dict[str, Any]:"""Get information about the current session."""if not context:return {"error": "No context available"}return {"session_id": context.session_id,"request_id": context.request_id,"client_id": context.client_id}async def run_resumable_server(host: str = "127.0.0.1", port: int = 8001):"""Run server with resumability capabilities."""print(f"🚀 Starting MCP server with resumability at {host}:{port}")# Create applicationapp = create_streamable_http_app(server=mcp,streamable_http_path="/mcp/",stateless_http=False, # IMPORTANT: stateful for sessionsdebug=True)config = uvicorn.Config(app=app,host=host,port=port,log_level="info",access_log=False)server = uvicorn.Server(config)print(f"✅ Resumable server ready at http://{host}:{port}/mcp/")print("🔧 Tools with resumability:")print(" - resumable_data_processing: Processing with checkpoints")# print(" - large_file_download: Download resumable")# print(" - machine_learning_training: ML training resumable")print(" - pause_task: Pause tasks")print(" - list_session_tasks: List session tasks")print(" - get_checkpoint_stats: Checkpoint statistics")print(" - get_session_info: Current session information")await server.serve()if __name__ == "__main__":try:asyncio.run(run_resumable_server())except KeyboardInterrupt:print(" ⏹️ Server stopped by user")except Exception as e:print(f"❌ Error running server: {e}")
Writing MCP_resumable_server/server.py
Cliente MCP resumível
Aqui também teremos um cliente muito semelhante ao do post anterior streamable MCP, mas vamos adicionar um json com as informações das sessões criadas. Assim, se uma sessão for interrompida no meio de um processo, será possível continuar de onde parou.
Implementar cliente MCP resumível
Criar o ambiente virtual para o cliente
Primeiro criamos a pasta onde vamos desenvolvê-lo.
!mkdir MCP_resumable_client
Criamos o ambiente com uv
!cd MCP_resumable_client && uv init .
Initialized project `mcp-resumable-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_client`
Iniciamos o ambiente
!cd MCP_resumable_client && uv venv
Using CPython 3.12.8Creating virtual environment at: .venvActivate with: source .venv/bin/activate
Instalamos as bibliotecas necessárias
!cd MCP_resumable_client && uv add fastmcp
Resolved 64 packages in 314msInstalled 61 packages in 145ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...+ annotated-types==0.7.0+ anyio==4.10.0+ attrs==25.3.0+ authlib==1.6.2+ certifi==2025.8.3+ cffi==1.17.1+ charset-normalizer==3.4.3+ click==8.2.1+ cryptography==45.0.6+ cyclopts==3.22.5+ dnspython==2.7.0+ docstring-parser==0.17.0+ docutils==0.22+ email-validator==2.2.0+ exceptiongroup==1.3.0+ fastmcp==2.11.3+ h11==0.16.0+ httpcore==1.0.9+ httpx==0.28.1+ httpx-sse==0.4.1+ idna==3.10+ isodate==0.7.2+ jsonschema==4.25.1+ jsonschema-path==0.3.4+ jsonschema-specifications==2025.4.1+ lazy-object-proxy==1.12.0+ markdown-it-py==4.0.0+ markupsafe==3.0.2+ mcp==1.13.1+ mdurl==0.1.2+ more-itertools==10.7.0+ openapi-core==0.19.5+ openapi-pydantic==0.5.1+ openapi-schema-validator==0.6.3+ openapi-spec-validator==0.7.2+ parse==1.20.2+ pathable==0.4.4+ pycparser==2.22+ pydantic==2.11.7+ pydantic-core==2.33.2+ pydantic-settings==2.10.1+ pygments==2.19.2+ pyperclip==1.9.0+ python-dotenv==1.1.1+ python-multipart==0.0.20+ pyyaml==6.0.2+ referencing==0.36.2+ requests==2.32.5+ rfc3339-validator==0.1.4+ rich==14.1.0+ rich-rst==1.3.1+ rpds-py==0.27.0+ six==1.17.0+ sniffio==1.3.1+ sse-starlette==3.0.2+ starlette==0.47.3+ typing-extensions==4.14.1+ typing-inspection==0.4.1+ urllib3==2.5.0+ uvicorn==0.35.0+ werkzeug==3.1.1
Código do cliente
Agora vamos criar o cliente MCP. Ele também será muito semelhante ao do post anterior (é a última vez que digo isso, juro) streamable MCP, só que além disso guardamos as sessões em um json
para poder recuperar aquelas que ficaram pela metade em um processo do servidor.
%%writefile MCP_resumable_client/client.py#!/usr/bin/env python3"""Client MCP with resumability and session persistence capabilities.Demonstrates how to connect to a resumable server and handle interruptions."""import asyncioimport jsonimport timeimport uuidfrom typing import Any, Dict, List, Optional, Callablefrom dataclasses import dataclass, fieldfrom datetime import datetimefrom pathlib import Pathfrom fastmcp import Clientfrom fastmcp.client.transports import StreamableHttpTransport@dataclassclass TaskProgress:"""Represents the progress of a task."""task_id: strtask_name: strcurrent_step: inttotal_steps: intlast_message: strstarted_at: datetime = field(default_factory=datetime.now)last_update: datetime = field(default_factory=datetime.now)@propertydef progress_percent(self) -> float:"""Calculate progress percentage."""if self.total_steps == 0:return 0.0return (self.current_step / self.total_steps) * 100@dataclassclass SessionState:"""Session state of the client."""client_id: str # Persistent client IDsession_id: str # Session ID of the server (can change)server_url: stractive_tasks: Dict[str, TaskProgress]completed_tasks: List[str]created_at: datetime = field(default_factory=datetime.now)class ResumableProgressHandler:"""Handles progress with resumability capabilities."""def __init__(self, task_name: str, session_state: SessionState):self.task_name = task_nameself.session_state = session_stateself.start_time = time.time()self.last_progress = 0self.task_progress: Optional[TaskProgress] = Nonedef __call__(self, progress: float, total: float, message: str):"""Callback for progress updates."""# Create or update task progressif not self.task_progress:# Try to find task_id from the messagetask_id = self._extract_task_id(message)if not task_id:task_id = f"task_{int(time.time())}"self.task_progress = TaskProgress(task_id=task_id,task_name=self.task_name,current_step=int(progress),total_steps=int(total),last_message=message)self.session_state.active_tasks[task_id] = self.task_progresselse:self.task_progress.current_step = int(progress)self.task_progress.total_steps = int(total)self.task_progress.last_message = messageself.task_progress.last_update = datetime.now()# Display progress visuallyself._display_progress(progress, total, message)# Save session stateself._save_session_state()def _extract_task_id(self, message: str) -> Optional[str]:"""Try to extract task_id from the message."""# Search for patterns like "task_id: abc123" in the messageimport rematch = re.search(r'task[_s]*id[:s]*([a-f0-9]{16})', message.lower())return match.group(1) if match else Nonedef _display_progress(self, progress: float, total: float, message: str):"""Display progress visually."""percentage = (progress / total) * 100 if total > 0 else 0bar_length = 30filled_length = int(bar_length * percentage / 100)bar = '█' * filled_length + '░' * (bar_length - filled_length)elapsed = time.time() - self.start_timeprint(f" 📊 {self.task_name}: |{bar}| {percentage:.1f}% "f"({progress:.0f}/{total:.0f}) - {message} [{elapsed:.1f}s]",end='', flush=True)if progress >= total:print() # New line when completedif self.task_progress:# Move to completedtask_id = self.task_progress.task_idself.session_state.completed_tasks.append(task_id)if task_id in self.session_state.active_tasks:del self.session_state.active_tasks[task_id]self._save_session_state()def _save_session_state(self):"""Save session state."""# In a real case, this would be saved to diskpassclass MCPResumableClient:"""Client MCP with resumability capabilities."""def __init__(self, server_url: str = "http://localhost:8001/mcp/"):self.server_url = server_urlself.transport = Noneself.client = Noneself.session_state: Optional[SessionState] = Noneself.state_file = Path("session_state.json")async def __aenter__(self):"""Initialize connection and load state."""# Load previous state if it existsself._load_session_state()self.transport = StreamableHttpTransport(url=self.server_url,sse_read_timeout=120.0 # Timeout for resumable tasks)self.client = Client(transport=self.transport)await self.client.__aenter__()# Create new session state if it doesn't existif not self.session_state:# Generate unique persistent client IDclient_id = str(uuid.uuid4())[:16] # Use only the first 16 characters# Get real session_id from the servertry:session_info_result = await self.client.call_tool("get_session_info", {})# Extract content from CallToolResultif hasattr(session_info_result, 'content') and session_info_result.content:session_info_text = session_info_result.content[0].textif isinstance(session_info_text, str):session_info = json.loads(session_info_text)else:session_info = session_info_textelse:session_info = {}real_session_id = session_info.get("session_id", f"session_{int(time.time())}")print("🏗️ Creating new session")print(f"🔗 Session ID of the server: {real_session_id}")print(f"🆔 Persistent Client ID: {client_id}")except Exception as e:print(f"⚠️ Could not get session_id from the server: {e}")real_session_id = f"session_{int(time.time())}"self.session_state = SessionState(client_id=client_id,session_id=real_session_id,server_url=self.server_url,active_tasks={},completed_tasks=[])self._save_session_state()else:# Reuse existing client_idprint(f"🆔 Reusing Persistent Client ID: {self.session_state.client_id}")# Verify if the session_id matches the servertry:session_info_result = await self.client.call_tool("get_session_info", {})# Extract content from CallToolResultif hasattr(session_info_result, 'content') and session_info_result.content:session_info_text = session_info_result.content[0].textif isinstance(session_info_text, str):session_info = json.loads(session_info_text)else:session_info = session_info_textelse:session_info = {}server_session_id = session_info.get("session_id")if server_session_id and server_session_id != self.session_state.session_id:print(f"⚠️ Session ID of the server changed: {self.session_state.session_id} → {server_session_id}")self.session_state.session_id = server_session_idself._save_session_state()except Exception as e:print(f"⚠️ Could not verify session_id: {e}")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""Close connection and save state."""if self.client:await self.client.__aexit__(exc_type, exc_val, exc_tb)self._save_session_state()def _load_session_state(self):"""Load session state from disk."""if self.state_file.exists():try:with open(self.state_file, 'r') as f:data = json.load(f)# Reconstruct objectsactive_tasks = {}for task_id, task_data in data.get('active_tasks', {}).items():task_data['started_at'] = datetime.fromisoformat(task_data['started_at'])task_data['last_update'] = datetime.fromisoformat(task_data['last_update'])active_tasks[task_id] = TaskProgress(**task_data)self.session_state = SessionState(client_id=data.get('client_id', str(uuid.uuid4())[:16]), # Generate if not existssession_id=data['session_id'],server_url=data['server_url'],active_tasks=active_tasks,completed_tasks=data.get('completed_tasks', []),created_at=datetime.fromisoformat(data['created_at']))print(f"📂 Session state loaded: {self.session_state.session_id}")except Exception as e:print(f"⚠️ Error loading session state: {e}")self.session_state = Noneelse:print("💬 No session state found, starting new session")self.session_state = Nonedef _save_session_state(self):"""Save session state to disk."""if not self.session_state:returntry:# Convert to serializable formatactive_tasks = {}for task_id, task_progress in self.session_state.active_tasks.items():active_tasks[task_id] = {'task_id': task_progress.task_id,'task_name': task_progress.task_name,'current_step': task_progress.current_step,'total_steps': task_progress.total_steps,'last_message': task_progress.last_message,'started_at': task_progress.started_at.isoformat(),'last_update': task_progress.last_update.isoformat()}data = {'client_id': self.session_state.client_id,'session_id': self.session_state.session_id,'server_url': self.session_state.server_url,'active_tasks': active_tasks,'completed_tasks': self.session_state.completed_tasks,'created_at': self.session_state.created_at.isoformat()}with open(self.state_file, 'w') as f:json.dump(data, f, indent=2)except Exception as e:print(f"⚠️ Error saving session state: {e}")async def test_connection(self) -> bool:"""Test connection to the server."""try:await self.client.ping()print(f"✅ Connection established with the resumable server")return Trueexcept Exception as e:print(f"❌ Connection error: {e}")return Falseasync def call_resumable_tool(self,tool_name: str,parameters: Dict[str, Any],allow_resume: bool = True) -> Dict[str, Any]:"""Call tool with resumability capabilities."""progress_handler = ResumableProgressHandler(tool_name, self.session_state)start_time = time.time()# Add client_id to the parameters if allow_resume is enabledif allow_resume and self.session_state:parameters = {**parameters, 'client_id': self.session_state.client_id}try:print(f"🚀 Executing {tool_name} (resumption: {'✅' if allow_resume else '❌'})")result = await self.client.call_tool(tool_name,parameters,progress_handler=progress_handler)duration = time.time() - start_timeprint(f"✅ {tool_name} completed in {duration:.2f}s")# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except KeyboardInterrupt:print(f" ⏸️ {tool_name} interrupted by the user")# In a real implementation, here we would pause the taskraiseexcept Exception as e:duration = time.time() - start_timeprint(f"❌ {tool_name} failed after {duration:.2f}s: {e}")raiseasync def list_session_tasks(self) -> Dict[str, Any]:"""List session tasks."""try:parameters = {}if self.session_state:parameters['client_id'] = self.session_state.client_idresult = await self.client.call_tool("list_session_tasks", parameters)# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except Exception as e:print(f"❌ Error listing session tasks: {e}")return {"error": str(e)}async def pause_task(self, task_id: str) -> Dict[str, Any]:"""Pause a specific task."""try:result = await self.client.call_tool("pause_task", {"task_id": task_id})# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except Exception as e:print(f"❌ Error pausing task {task_id}: {e}")return {"error": str(e)}async def get_checkpoint_stats(self) -> Dict[str, Any]:"""Get checkpoint statistics."""try:result = await self.client.call_tool("get_checkpoint_stats", {})# Extract content from CallToolResultif hasattr(result, 'content') and result.content:if isinstance(result.content[0].text, str):try:return json.loads(result.content[0].text)except json.JSONDecodeError:return {"text": result.content[0].text}else:return result.content[0].textelse:return {"error": "No content in result"}except Exception as e:print(f"❌ Error getting checkpoint statistics: {e}")return {"error": str(e)}def show_session_summary(self):"""Show session summary."""if not self.session_state:print("❌ No session state available")returnprint(" " + "="*60)print("📋 SESSION SUMMARY")print("="*60)print(f"🆔 Client ID (persistent): {self.session_state.client_id}")print(f"📌 Session ID (server): {self.session_state.session_id}")print(f"🔗 Server: {self.session_state.server_url}")print(f"📅 Created: {self.session_state.created_at.strftime('%Y-%m-%d %H:%M:%S')}")print(f" 🔄 Active Tasks: {len(self.session_state.active_tasks)}")for task_id, task in self.session_state.active_tasks.items():progress = task.progress_percentprint(f" • {task.task_name} ({task_id[:8]}...): {progress:.1f}%")print(f" └─ {task.last_message}")print(f" ✅ Completed Tasks: {len(self.session_state.completed_tasks)}")for task_id in self.session_state.completed_tasks[-5:]: # Last 5print(f" • {task_id[:8]}...")async def demo_resumable_data_processing(client: MCPResumableClient):"""Demo of resumable data processing."""print(" " + "="*60)print("📊 DEMO: Resumable Data Processing")print("="*60)result = await client.call_resumable_tool("resumable_data_processing",{"dataset_name": "customer_data","total_records": 50,"batch_size": 5})if "error" not in result:print(f"📊 Result: {result.get('total_processed', 0)} records processed")print(f"🆔 Task ID: {result.get('task_id', 'N/A')}")async def interactive_demo(client: MCPResumableClient):"""Interactive demo with options."""while True:print(" " + "="*60)print("🎮 INTERACTIVE DEMO - Resumable Client")print("="*60)print("1. Resumable data processing")print("2. List session tasks")print("3. Checkpoint statistics")print("4. Session summary")print("0. Exit")print("-" * 60)try:choice = input("Select an option (0-4): ").strip()if choice == "0":breakelif choice == "1":await demo_resumable_data_processing(client)elif choice == "2":tasks = await client.list_session_tasks()print(f"📋 Session tasks: {json.dumps(tasks, indent=2)}")elif choice == "3":stats = await client.get_checkpoint_stats()print(f"📊 Checkpoint statistics: {json.dumps(stats, indent=2)}")elif choice == "4":client.show_session_summary()else:print("❌ Invalid option")except KeyboardInterrupt:print(" ⏸️ Demo interrupted")breakexcept Exception as e:print(f"❌ Error: {e}")async def run_resumable_demo():"""Run resumable demo."""print("🌟 Resumable Client MCP - Demo")print("="*60)try:async with MCPResumableClient() as client:# Test connectionif not await client.test_connection():print("❌ Could not connect to the resumable server.")print(" Make sure the server is running on port 8001")return# Show session stateclient.show_session_summary()# Run interactive demoawait interactive_demo(client)print(" 🎉 Demo completed")except Exception as e:print(f"❌ Error in the demo: {e}")if __name__ == "__main__":try:asyncio.run(run_resumable_demo())except KeyboardInterrupt:print(" ⏹️ Demo interrupted by the user")except Exception as e:print(f"❌ Error running demo: {e}")
Overwriting MCP_resumable_client/client.py
Execução
Vamos iniciar o servidor, iniciar o cliente, solicitar uma tarefa ao servidor, parar o cliente e o servidor para simular que a conexão foi perdida e, em seguida, reiniciar o servidor e o cliente para retomar a tarefa de onde paramos.
Primeira execução - processo interrompido
Antes de executar nada, vamos ver o conteúdo das pastas do servidor e do cliente.
Vamos ver o que há na pasta do servidor.
!cd MCP_resumable_server && ls -lha
total 336drwxr-xr-x@ 11 macm1 staff 352B Aug 25 10:26 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.mddrwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock
Temos os arquivos .python-version
, .venv
, README.md
, __pycache__
, main.py
, pyproject.toml
e uv.lock
, que são os arquivos criados pelo uv
ao criar o ambiente virtual. E temos os arquivos checkpoint_manager.py
e server.py
, que são os arquivos que criamos para o servidor MCP resumable.
Agora vamos ver quais arquivos estão na pasta do cliente.
!cd MCP_resumable_client && ls -lha
total 336drwxr-xr-x@ 9 macm1 staff 288B Aug 25 10:26 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock
Como antes, temos os arquivos .python-version
, .venv
, README.md
, main.py
, pyproject.toml
e uv.lock
, que são os arquivos criados pelo uv
ao criar o ambiente virtual. E temos o arquivo client.py
, que é o arquivo que criamos para o cliente MCP resumable.
Depois de ver isso, reiniciamos o servidor.
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
🚀 Starting MCP server with resumability at 127.0.0.1:8001✅ Resumable server ready at http://127.0.0.1:8001/mcp/🔧 Tools with resumability:- resumable_data_processing: Processing with checkpoints- pause_task: Pause tasks- list_session_tasks: List session tasks- get_checkpoint_stats: Checkpoint statistics- get_session_info: Current session informationINFO: Started server process [47049]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)
Agora executamos o cliente, selecionamos a opção 1 (Processamento de dados retomável) e o interrompemos no meio do processo.
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
🌟 Resumable Client MCP - Demo============================================================💬 No session state found, starting new session🏗️ Creating new session🔗 Session ID of the server: 6bf64139282e41e0a2233ec92647bf02🆔 Persistent Client ID: f0af843a-9b0e-42✅ Connection established with the resumable server============================================================📋 SESSION SUMMARY============================================================🆔 Client ID (persistent): f0af843a-9b0e-42📌 Session ID (server): 6bf64139282e41e0a2233ec92647bf02🔗 Server: http://localhost:8001/mcp/📅 Created: 2025-08-25 10:34:53🔄 Active Tasks: 0✅ Completed Tasks: 0============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4): 1============================================================📊 DEMO: Resumable Data Processing============================================================🚀 Executing resumable_data_processing (resumption: ✅)[08/25/25 10:35:19] INFO Server log: logging.py:40🆕 Starting new processing of 50 records📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]^C⏹️ Demo interrupted by the user
Também paramos o servidor.
Agora que paramos o servidor e o cliente, vamos ver novamente o que há em cada pasta.
Primeiro, vamos ver o que há na pasta do servidor.
!cd MCP_resumable_server && ls -lha
total 336drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.mddrwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.pydrwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 checkpoints-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock
Podemos ver que, além de tudo o que havia antes, agora há também uma pasta chamada “checkpoints” que contém os pontos de verificação das tarefas que foram executadas.
!cd MCP_resumable_server/checkpoints && ls -lha
total 8drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 .drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 ..-rw-r--r--@ 1 macm1 staff 1.1K Aug 25 10:35 checkpoints.jsondrwxr-xr-x@ 3 macm1 staff 96B Aug 25 10:35 data
Se observarmos o conteúdo do json
, podemos ver
"current_step": 15,
"total_steps": 50,
!cd MCP_resumable_server/checkpoints && cat checkpoints.json
{"9d7bc504c50e0bce": {"task_id": "9d7bc504c50e0bce","session_id": "864eec3a-be1b-43","task_name": "resumable_data_processing","parameters": {"dataset_name": "customer_data","total_records": 50,"batch_size": 5},"current_step": 15,"total_steps": 50,"status": "failed","created_at": "2025-08-25T08:35:19.349142+00:00","updated_at": "2025-08-25T08:35:23.366672+00:00","data": {"last_batch": [{"record_id": "customer_data_000015","processed_at": "step_15","value": 88},{"record_id": "customer_data_000016","processed_at": "step_16","value": 17},{"record_id": "customer_data_000017","processed_at": "step_17","value": 596},{"record_id": "customer_data_000018","processed_at": "step_18","value": 693},{"record_id": "customer_data_000019","processed_at": "step_19","value": 34}]},"error_message": ""}}
Ou seja, ficamos na etapa 15 de 50, que, se você observar a saída do cliente, é onde interrompemos o processo.
📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C
Agora vemos o conteúdo da pasta do cliente e podemos ver que agora existe um arquivo chamado session_state.json
.
!cd MCP_resumable_client && ls -lha
total 344drwxr-xr-x@ 10 macm1 staff 320B Aug 25 10:35 .drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-versiondrwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml-rw-r--r--@ 1 macm1 staff 546B Aug 25 10:35 session_state.json-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock
Vamos ver o que tem dentro.
!cd MCP_resumable_client && cat session_state.json
{"client_id": "864eec3a-be1b-43","session_id": "d34ff655d6464fb4ae5eeb64c62989b0","server_url": "http://localhost:8001/mcp/","active_tasks": {"task_1756110920": {"task_id": "task_1756110920","task_name": "resumable_data_processing","current_step": 15,"total_steps": 50,"last_message": "Processed 15/50 records","started_at": "2025-08-25T10:35:20.353521","last_update": "2025-08-25T10:35:22.364507"}},"completed_tasks": [],"created_at": "2025-08-25T10:35:16.230926"}
Assim como no json
do servidor, vemos
"current_step": 15,
"total_steps": 50,
Também ficou registrado que ficamos na etapa 15 de 50, mas o mais importante é
"task_id": "task_1756110920",
Onde guardamos o ID da tarefa para poder solicitar ao servidor que a retome.
Segunda execução - processo retomado
Se agora reiniciarmos o servidor
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
🚀 Starting MCP server with resumability at 127.0.0.1:8001✅ Resumable server ready at http://127.0.0.1:8001/mcp/🔧 Tools with resumability:- resumable_data_processing: Processing with checkpoints- pause_task: Pause tasks- list_session_tasks: List session tasks- get_checkpoint_stats: Checkpoint statistics- get_session_info: Current session informationINFO: Started server process [52338]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)
E voltamos a executar o cliente
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
🌟 Resumable Client MCP - Demo============================================================📂 Session state loaded: d34ff655d6464fb4ae5eeb64c62989b0🆔 Reusing Persistent Client ID: 864eec3a-be1b-43⚠️ Session ID of the server changed: d34ff655d6464fb4ae5eeb64c62989b0 → 004bc91edde144898938136d0d2b5041✅ Connection established with the resumable server============================================================📋 SESSION SUMMARY============================================================🆔 Client ID (persistent): 864eec3a-be1b-43📌 Session ID (server): 004bc91edde144898938136d0d2b5041🔗 Server: http://localhost:8001/mcp/📅 Created: 2025-08-25 10:35:16🔄 Active Tasks: 1• resumable_data_processing (task_175...): 30.0%└─ Processed 15/50 records✅ Completed Tasks: 0============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4):
Podemos ver que diz
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
Foi possível recuperar as informações que ficaram na etapa 15 de 50 da tarefa com ID 1756110920
.
Agora, selecionamos novamente a opção 1 e veremos que continua a partir do passo 15.
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
🌟 Resumable Client MCP - Demo============================================================📂 Session state loaded: 004bc91edde144898938136d0d2b5041🆔 Reusing Persistent Client ID: 864eec3a-be1b-43⚠️ Session ID of the server changed: 004bc91edde144898938136d0d2b5041 → 4b93c36446504a3d92467d39a8cba589✅ Connection established with the resumable server============================================================📋 SESSION SUMMARY============================================================🆔 Client ID (persistent): 864eec3a-be1b-43📌 Session ID (server): 4b93c36446504a3d92467d39a8cba589🔗 Server: http://localhost:8001/mcp/📅 Created: 2025-08-25 10:35:16🔄 Active Tasks: 1• resumable_data_processing (task_175...): 30.0%└─ Processed 15/50 records✅ Completed Tasks: 0============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4): 1============================================================📊 DEMO: Resumable Data Processing============================================================🚀 Executing resumable_data_processing (resumption: ✅)[08/25/25 10:57:31] INFO Server log: 🔄 Resuming logging.py:40processing from record 15📊 resumable_data_processing: |████████████░░░░░░░░░░░░░░░░░░| 40.0% (20/50) - Processed 20/50 records [1.0s]📊 resumable_data_processing: |███████████████░░░░░░░░░░░░░░░| 50.0% (25/50) - Processed 25/50 records [2.0s]📊 resumable_data_processing: |██████████████████░░░░░░░░░░░░| 60.0% (30/50) - Processed 30/50 records [3.0s]📊 resumable_data_processing: |█████████████████████░░░░░░░░░| 70.0% (35/50) - Processed 35/50 records [4.0s]📊 resumable_data_processing: |████████████████████████░░░░░░| 80.0% (40/50) - Processed 40/50 records [5.0s]📊 resumable_data_processing: |███████████████████████████░░░| 90.0% (45/50) - Processed 45/50 records [6.0s]📊 resumable_data_processing: |██████████████████████████████| 100.0% (50/50) - Processed 50/50 records [7.0s][08/25/25 10:57:38] INFO Server log: ✅ Processing logging.py:40completed: 55 records✅ resumable_data_processing completed in 7.05s📊 Result: 55 records processed🆔 Task ID: 88e68f3bdf65cd22============================================================🎮 INTERACTIVE DEMO - Resumable Client============================================================1. Resumable data processing2. List session tasks3. Checkpoint statistics4. Session summary0. Exit------------------------------------------------------------Select an option (0-4): 0🎉 Demo completed
Vemos que aparece “processando a partir do registro 15”, ou seja, ele começou de onde parou.
Desta forma, já podemos ter servidores e clientes MCP que processam tarefas longas e que podem ser retomadas caso sejam interrompidas.
Isso também é muito útil quando a tarefa é muito longa, pois não é necessário manter o cliente sempre em execução e em espera, mas podemos fazer com que o cliente solicite a tarefa ao servidor e, de tempos em tempos, solicite o status da tarefa.