MCP Resumível: Como Criar Servidores e Clientes com Checkpoints Automáticos

MCP Resumível: Como Criar Servidores e Clientes com Checkpoints Automáticos MCP Resumível: Como Criar Servidores e Clientes com Checkpoints Automáticos

No post anterior streamable MCP, vimos como fazer com que o servidor MCP envie as informações do processo que está sendo realizado, para que o cliente possa mostrar essas informações ao usuário.

Isso é útil quando o processo realizado pelo servidor MCP é longo.

Mas, e se esse processo for interrompido? Por causa de uma queda do servidor, perda de conexão, etc. Teríamos que pedir novamente ao servidor MCP para reiniciar o processo.

Para evitar que isso aconteça, vamos explicar como criar um servidor e um cliente MCP que possam continuar com o processo que está sendo realizado. Assim, se ele cair por qualquer motivo, será possível retomar o processo de onde parou.

Servidor MCP resumívellink image 29

Este servidor é muito semelhante ao que criamos na publicação streamable MCP, só que também criamos pontos de verificação para poder retomar um processo caso ele seja interrompido.

Então, vamos ver como implementá-lo.

Implementar servidor MCP resumablelink image 30

Criar ambiente virtuallink image 31

Primeiro criamos a pasta onde vamos desenvolvê-lo.

	
!mkdir MCP_resumable_server
Copy

Criamos o ambiente com uv

	
!cd MCP_resumable_server && uv init .
Copy
	
Initialized project `mcp-resumable-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_server`

Iniciamos o ambiente

	
!cd MCP_resumable_server && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos as bibliotecas necessárias

	
!cd MCP_resumable_server && uv add fastmcp uvicorn
Copy
	
Resolved 64 packages in 548ms
⠙ Preparing packages... (0/1) ⠋ Preparing packages... (0/0)
⠙ Preparing packages... (0/1)-------------- 0 B/71.28 KiB
⠙ Preparing packages... (0/1)-------------- 16.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)-------------- 32.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)---------- 48.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)---------- 64.00 KiB/71.28 KiB
Prepared 1 package in 134ms
Installed 61 packages in 152ms
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.2
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.22.5
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.2.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.0
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.14.1
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código do servidorlink image 32

Agora vamos escrever o código necessário para o servidor MCP resumable.

Checkpoint managerlink image 33

Como já dissemos, a maior diferença em relação ao servidor da publicação anterior streamable MCP é que, neste, vamos guardar o estado em pontos de verificação para poder retomar o processo caso ele seja interrompido. Então, vamos criar o gerenciador de pontos de verificação.

	
%%writefile MCP_resumable_server/checkpoint_manager.py
#!/usr/bin/env python3
"""
Checkpoint and resumability system for MCP streaming tasks.
Allows saving the state of long tasks and resuming them from where they were interrupted.
"""
import json
import pickle
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class TaskStatus(Enum):
"""Task states."""
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskCheckpoint:
"""Represents a checkpoint of a task."""
task_id: str
session_id: str
task_name: str
parameters: Dict[str, Any]
current_step: int
total_steps: int
status: TaskStatus
created_at: datetime
updated_at: datetime
data: Dict[str, Any] # Estado específico de la tarea
error_message: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to serializable dictionary."""
data = asdict(self)
data['status'] = data['status'].value
data['created_at'] = data['created_at'].isoformat()
data['updated_at'] = data['updated_at'].isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TaskCheckpoint':
"""Create from dictionary."""
data['status'] = TaskStatus(data['status'])
data['created_at'] = datetime.fromisoformat(data['created_at'])
data['updated_at'] = datetime.fromisoformat(data['updated_at'])
return cls(**data)
class CheckpointManager:
"""Checkpoint manager for streaming tasks."""
def __init__(self, storage_dir: str = "checkpoints"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.checkpoints_file = self.storage_dir / "checkpoints.json"
self.data_dir = self.storage_dir / "data"
self.data_dir.mkdir(exist_ok=True)
self._checkpoints: Dict[str, TaskCheckpoint] = {}
self._load_checkpoints()
def _load_checkpoints(self):
"""Load checkpoints from disk."""
if self.checkpoints_file.exists():
try:
with open(self.checkpoints_file, 'r') as f:
data = json.load(f)
self._checkpoints = {
task_id: TaskCheckpoint.from_dict(checkpoint_data)
for task_id, checkpoint_data in data.items()
}
except Exception as e:
print(f"❌ Error loading checkpoints: {e}")
self._checkpoints = {}
def _save_checkpoints(self):
"""Save checkpoints to disk."""
try:
data = {
task_id: checkpoint.to_dict()
for task_id, checkpoint in self._checkpoints.items()
}
with open(self.checkpoints_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"❌ Error saving checkpoints: {e}")
def generate_task_id(self, session_id: str, task_name: str, parameters: Dict[str, Any]) -> str:
"""Generate unique ID for a task."""
# Create hash based on session, task and parameters
data = f"{session_id}-{task_name}-{json.dumps(parameters, sort_keys=True)}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def create_checkpoint(
self,
session_id: str,
task_name: str,
parameters: Dict[str, Any],
total_steps: int,
initial_data: Optional[Dict[str, Any]] = None
) -> str:
"""Create new checkpoint."""
task_id = self.generate_task_id(session_id, task_name, parameters)
checkpoint = TaskCheckpoint(
task_id=task_id,
session_id=session_id,
task_name=task_name,
parameters=parameters,
current_step=0,
total_steps=total_steps,
status=TaskStatus.PENDING,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
data=initial_data or {}
)
self._checkpoints[task_id] = checkpoint
self._save_checkpoints()
return task_id
def update_checkpoint(
self,
task_id: str,
current_step: int,
status: TaskStatus,
data: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None
) -> bool:
"""Update existing checkpoint."""
if task_id not in self._checkpoints:
return False
checkpoint = self._checkpoints[task_id]
checkpoint.current_step = current_step
checkpoint.status = status
checkpoint.updated_at = datetime.now(timezone.utc)
if data is not None:
checkpoint.data.update(data)
if error_message is not None:
checkpoint.error_message = error_message
self._save_checkpoints()
return True
def get_checkpoint(self, task_id: str) -> Optional[TaskCheckpoint]:
"""Get checkpoint by ID."""
return self._checkpoints.get(task_id)
def find_resumable_task(
self,
session_id: str,
task_name: str,
parameters: Dict[str, Any]
) -> Optional[TaskCheckpoint]:
"""Find resumable task with the same parameters."""
task_id = self.generate_task_id(session_id, task_name, parameters)
checkpoint = self._checkpoints.get(task_id)
if checkpoint and checkpoint.status in [TaskStatus.PAUSED, TaskStatus.RUNNING]:
return checkpoint
return None
def get_session_checkpoints(self, session_id: str) -> List[TaskCheckpoint]:
"""Get all checkpoints of a session."""
return [
checkpoint for checkpoint in self._checkpoints.values()
if checkpoint.session_id == session_id
]
def save_task_data(self, task_id: str, data: Any) -> bool:
"""Save specific task data."""
try:
data_file = self.data_dir / f"{task_id}.pkl"
with open(data_file, 'wb') as f:
pickle.dump(data, f)
return True
except Exception as e:
print(f"❌ Error saving task data {task_id}: {e}")
return False
def load_task_data(self, task_id: str) -> Any:
"""Load specific task data."""
try:
data_file = self.data_dir / f"{task_id}.pkl"
if data_file.exists():
with open(data_file, 'rb') as f:
return pickle.load(f)
return None
except Exception as e:
print(f"❌ Error loading task data {task_id}: {e}")
return None
def delete_checkpoint(self, task_id: str) -> bool:
"""Delete checkpoint and associated data."""
try:
# Delete checkpoint
if task_id in self._checkpoints:
del self._checkpoints[task_id]
self._save_checkpoints()
# Delete data
data_file = self.data_dir / f"{task_id}.pkl"
if data_file.exists():
data_file.unlink()
return True
except Exception as e:
print(f"❌ Error deleting checkpoint {task_id}: {e}")
return False
def cleanup_old_checkpoints(self, max_age_days: int = 7) -> int:
"""Clean up old checkpoints."""
cutoff_date = datetime.now(timezone.utc).replace(
day=datetime.now(timezone.utc).day - max_age_days
)
to_delete = []
for task_id, checkpoint in self._checkpoints.items():
if (checkpoint.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]
and checkpoint.updated_at < cutoff_date):
to_delete.append(task_id)
for task_id in to_delete:
self.delete_checkpoint(task_id)
return len(to_delete)
def get_stats(self) -> Dict[str, int]:
"""Get checkpoint statistics."""
stats = {status.value: 0 for status in TaskStatus}
for checkpoint in self._checkpoints.values():
stats[checkpoint.status.value] += 1
return stats
# Singleton global for checkpoint manager
checkpoint_manager = CheckpointManager()
Copy
	
Writing MCP_resumable_server/checkpoint_manager.py
Servidorlink image 34

Agora vamos criar o código do servidor.

	
%%writefile MCP_resumable_server/server.py
#!/usr/bin/env python3
"""
Server MCP with resumability and checkpoints.
Demo how to implement tasks that can be paused and resumed.
"""
import asyncio
import uvicorn
import random
from typing import Dict, List, Optional, Any
from fastmcp import FastMCP, Context
from fastmcp.server.http import create_streamable_http_app
from checkpoint_manager import checkpoint_manager, TaskStatus, TaskCheckpoint
# Create instance of the MCP server
mcp = FastMCP(
name="Resumable Server",
instructions="MCP server with resumability and checkpoints for long tasks"
)
@mcp.tool
async def resumable_data_processing(
dataset_name: str = "default_dataset",
total_records: int = 100,
batch_size: int = 10,
client_id: str = None, # Add persistent client_id
context: Context = None
) -> Dict[str, Any]:
"""
Resumable data processing with automatic checkpoints.
Args:
dataset_name: Name of the dataset to process
total_records: Total records to process
batch_size: Size of the batch per iteration
client_id: Persistent client id for resumability
"""
if not context:
return {"error": "Requires context for resumability"}
# Use persistent client id if available, otherwise use session_id
persistent_id = client_id if client_id else context.session_id
if not persistent_id:
return {"error": "Requires client_id or session_id for resumability"}
task_name = "resumable_data_processing"
parameters = {
"dataset_name": dataset_name,
"total_records": total_records,
"batch_size": batch_size
}
# Search for existing task to resume - include interrupted tasks
task_id = checkpoint_manager.generate_task_id(persistent_id, task_name, parameters)
existing_checkpoint = checkpoint_manager.get_checkpoint(task_id)
if existing_checkpoint and existing_checkpoint.current_step > 0:
# Resume from checkpoint
await context.info(f"🔄 Resuming processing from record {existing_checkpoint.current_step}")
start_record = existing_checkpoint.current_step
processed_data = checkpoint_manager.load_task_data(task_id) or []
# Mark as running
checkpoint_manager.update_checkpoint(
task_id, start_record, TaskStatus.RUNNING
)
else:
# New task
await context.info(f"🆕 Starting new processing of {total_records} records")
if not existing_checkpoint:
task_id = checkpoint_manager.create_checkpoint(
persistent_id, task_name, parameters, total_records
)
start_record = 0
processed_data = []
# Mark as running
checkpoint_manager.update_checkpoint(
task_id, 0, TaskStatus.RUNNING
)
try:
# Process in batches
for record_num in range(start_record, total_records, batch_size):
batch_end = min(record_num + batch_size, total_records)
# Simulate batch processing
await asyncio.sleep(1) # Simulate work
# Process batch
batch_results = []
for i in range(record_num, batch_end):
# Simulate record processing
result = {
"record_id": f"{dataset_name}_{i:06d}",
"processed_at": f"step_{i}",
"value": random.randint(1, 1000)
}
batch_results.append(result)
processed_data.extend(batch_results)
# Save checkpoint for each batch
checkpoint_manager.update_checkpoint(
task_id,
batch_end,
TaskStatus.RUNNING,
{"last_batch": batch_results}
)
# Save processed data
checkpoint_manager.save_task_data(task_id, processed_data)
# Report progress
await context.report_progress(
progress=batch_end,
total=total_records,
message=f"Processed {batch_end}/{total_records} records"
)
await context.debug(f"Batch {record_num}-{batch_end-1} completed")
# Mark as completed
checkpoint_manager.update_checkpoint(
task_id, total_records, TaskStatus.COMPLETED
)
await context.info(f"✅ Processing completed: {len(processed_data)} records")
return {
"task_id": task_id,
"dataset_name": dataset_name,
"total_processed": len(processed_data),
"records": processed_data[:5], # Show first 5
"status": "completed"
}
except Exception as e:
# Mark as failed
checkpoint_manager.update_checkpoint(
task_id, record_num, TaskStatus.FAILED, error_message=str(e)
)
await context.error(f"❌ Error in processing: {e}")
raise
@mcp.tool
async def pause_task(
task_id: str,
context: Context = None
) -> Dict[str, str]:
"""
Pause a task in execution.
Args:
task_id: ID of the task to pause
"""
checkpoint = checkpoint_manager.get_checkpoint(task_id)
if not checkpoint:
return {"error": f"Task {task_id} not found"}
if checkpoint.status != TaskStatus.RUNNING:
return {"error": f"Task {task_id} is not running"}
success = checkpoint_manager.update_checkpoint(
task_id, checkpoint.current_step, TaskStatus.PAUSED
)
if success:
if context:
await context.info(f"⏸️ Task {task_id} paused")
return {"message": f"Task {task_id} paused correctly"}
else:
return {"error": f"Could not pause task {task_id}"}
@mcp.tool
async def list_session_tasks(
client_id: str = None, # Add persistent client_id
context: Context = None
) -> Dict[str, Any]:
"""
List all tasks of the current session or the specified client.
Args:
client_id: Persistent client id (optional)
"""
if not context:
return {"error": "Requires context"}
# Use persistent client id if available, otherwise use session_id
persistent_id = client_id if client_id else context.session_id
if not persistent_id:
return {"error": "Requires client_id or session_id"}
checkpoints = checkpoint_manager.get_session_checkpoints(persistent_id)
tasks = []
for checkpoint in checkpoints:
task_info = {
"task_id": checkpoint.task_id,
"task_name": checkpoint.task_name,
"status": checkpoint.status.value,
"progress": f"{checkpoint.current_step}/{checkpoint.total_steps}",
"progress_percent": round((checkpoint.current_step / checkpoint.total_steps) * 100, 1) if checkpoint.total_steps > 0 else 0,
"created_at": checkpoint.created_at.isoformat(),
"updated_at": checkpoint.updated_at.isoformat()
}
tasks.append(task_info)
return {
"session_id": context.session_id,
"persistent_id": persistent_id,
"total_tasks": len(tasks),
"tasks": tasks
}
@mcp.tool
async def get_checkpoint_stats(context: Context = None) -> Dict[str, Any]:
"""
Get global checkpoint statistics.
"""
stats = checkpoint_manager.get_stats()
return {
"checkpoint_stats": stats,
"total_checkpoints": sum(stats.values())
}
@mcp.tool
async def get_session_info(context: Context = None) -> Dict[str, Any]:
"""
Get information about the current session.
"""
if not context:
return {"error": "No context available"}
return {
"session_id": context.session_id,
"request_id": context.request_id,
"client_id": context.client_id
}
async def run_resumable_server(host: str = "127.0.0.1", port: int = 8001):
"""Run server with resumability capabilities."""
print(f"🚀 Starting MCP server with resumability at {host}:{port}")
# Create application
app = create_streamable_http_app(
server=mcp,
streamable_http_path="/mcp/",
stateless_http=False, # IMPORTANT: stateful for sessions
debug=True
)
config = uvicorn.Config(
app=app,
host=host,
port=port,
log_level="info",
access_log=False
)
server = uvicorn.Server(config)
print(f"✅ Resumable server ready at http://{host}:{port}/mcp/")
print("🔧 Tools with resumability:")
print(" - resumable_data_processing: Processing with checkpoints")
# print(" - large_file_download: Download resumable")
# print(" - machine_learning_training: ML training resumable")
print(" - pause_task: Pause tasks")
print(" - list_session_tasks: List session tasks")
print(" - get_checkpoint_stats: Checkpoint statistics")
print(" - get_session_info: Current session information")
await server.serve()
if __name__ == "__main__":
try:
asyncio.run(run_resumable_server())
except KeyboardInterrupt:
print(" ⏹️ Server stopped by user")
except Exception as e:
print(f"❌ Error running server: {e}")
Copy
	
Writing MCP_resumable_server/server.py

Cliente MCP resumívellink image 35

Aqui também teremos um cliente muito semelhante ao do post anterior streamable MCP, mas vamos adicionar um json com as informações das sessões criadas. Assim, se uma sessão for interrompida no meio de um processo, será possível continuar de onde parou.

Implementar cliente MCP resumívellink image 36

Criar o ambiente virtual para o clientelink image 37

Primeiro criamos a pasta onde vamos desenvolvê-lo.

	
!mkdir MCP_resumable_client
Copy

Criamos o ambiente com uv

	
!cd MCP_resumable_client && uv init .
Copy
	
Initialized project `mcp-resumable-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_client`

Iniciamos o ambiente

	
!cd MCP_resumable_client && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos as bibliotecas necessárias

	
!cd MCP_resumable_client && uv add fastmcp
Copy
	
Resolved 64 packages in 314ms
Installed 61 packages in 145ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.2
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.22.5
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.2.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.0
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.14.1
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código do clientelink image 38

Agora vamos criar o cliente MCP. Ele também será muito semelhante ao do post anterior (é a última vez que digo isso, juro) streamable MCP, só que além disso guardamos as sessões em um json para poder recuperar aquelas que ficaram pela metade em um processo do servidor.

	
%%writefile MCP_resumable_client/client.py
#!/usr/bin/env python3
"""
Client MCP with resumability and session persistence capabilities.
Demonstrates how to connect to a resumable server and handle interruptions.
"""
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
@dataclass
class TaskProgress:
"""Represents the progress of a task."""
task_id: str
task_name: str
current_step: int
total_steps: int
last_message: str
started_at: datetime = field(default_factory=datetime.now)
last_update: datetime = field(default_factory=datetime.now)
@property
def progress_percent(self) -> float:
"""Calculate progress percentage."""
if self.total_steps == 0:
return 0.0
return (self.current_step / self.total_steps) * 100
@dataclass
class SessionState:
"""Session state of the client."""
client_id: str # Persistent client ID
session_id: str # Session ID of the server (can change)
server_url: str
active_tasks: Dict[str, TaskProgress]
completed_tasks: List[str]
created_at: datetime = field(default_factory=datetime.now)
class ResumableProgressHandler:
"""Handles progress with resumability capabilities."""
def __init__(self, task_name: str, session_state: SessionState):
self.task_name = task_name
self.session_state = session_state
self.start_time = time.time()
self.last_progress = 0
self.task_progress: Optional[TaskProgress] = None
def __call__(self, progress: float, total: float, message: str):
"""Callback for progress updates."""
# Create or update task progress
if not self.task_progress:
# Try to find task_id from the message
task_id = self._extract_task_id(message)
if not task_id:
task_id = f"task_{int(time.time())}"
self.task_progress = TaskProgress(
task_id=task_id,
task_name=self.task_name,
current_step=int(progress),
total_steps=int(total),
last_message=message
)
self.session_state.active_tasks[task_id] = self.task_progress
else:
self.task_progress.current_step = int(progress)
self.task_progress.total_steps = int(total)
self.task_progress.last_message = message
self.task_progress.last_update = datetime.now()
# Display progress visually
self._display_progress(progress, total, message)
# Save session state
self._save_session_state()
def _extract_task_id(self, message: str) -> Optional[str]:
"""Try to extract task_id from the message."""
# Search for patterns like "task_id: abc123" in the message
import re
match = re.search(r'task[_s]*id[:s]*([a-f0-9]{16})', message.lower())
return match.group(1) if match else None
def _display_progress(self, progress: float, total: float, message: str):
"""Display progress visually."""
percentage = (progress / total) * 100 if total > 0 else 0
bar_length = 30
filled_length = int(bar_length * percentage / 100)
bar = '█' * filled_length + '░' * (bar_length - filled_length)
elapsed = time.time() - self.start_time
print(f" 📊 {self.task_name}: |{bar}| {percentage:.1f}% "
f"({progress:.0f}/{total:.0f}) - {message} [{elapsed:.1f}s]",
end='', flush=True)
if progress >= total:
print() # New line when completed
if self.task_progress:
# Move to completed
task_id = self.task_progress.task_id
self.session_state.completed_tasks.append(task_id)
if task_id in self.session_state.active_tasks:
del self.session_state.active_tasks[task_id]
self._save_session_state()
def _save_session_state(self):
"""Save session state."""
# In a real case, this would be saved to disk
pass
class MCPResumableClient:
"""Client MCP with resumability capabilities."""
def __init__(self, server_url: str = "http://localhost:8001/mcp/"):
self.server_url = server_url
self.transport = None
self.client = None
self.session_state: Optional[SessionState] = None
self.state_file = Path("session_state.json")
async def __aenter__(self):
"""Initialize connection and load state."""
# Load previous state if it exists
self._load_session_state()
self.transport = StreamableHttpTransport(
url=self.server_url,
sse_read_timeout=120.0 # Timeout for resumable tasks
)
self.client = Client(transport=self.transport)
await self.client.__aenter__()
# Create new session state if it doesn't exist
if not self.session_state:
# Generate unique persistent client ID
client_id = str(uuid.uuid4())[:16] # Use only the first 16 characters
# Get real session_id from the server
try:
session_info_result = await self.client.call_tool("get_session_info", {})
# Extract content from CallToolResult
if hasattr(session_info_result, 'content') and session_info_result.content:
session_info_text = session_info_result.content[0].text
if isinstance(session_info_text, str):
session_info = json.loads(session_info_text)
else:
session_info = session_info_text
else:
session_info = {}
real_session_id = session_info.get("session_id", f"session_{int(time.time())}")
print("🏗️ Creating new session")
print(f"🔗 Session ID of the server: {real_session_id}")
print(f"🆔 Persistent Client ID: {client_id}")
except Exception as e:
print(f"⚠️ Could not get session_id from the server: {e}")
real_session_id = f"session_{int(time.time())}"
self.session_state = SessionState(
client_id=client_id,
session_id=real_session_id,
server_url=self.server_url,
active_tasks={},
completed_tasks=[]
)
self._save_session_state()
else:
# Reuse existing client_id
print(f"🆔 Reusing Persistent Client ID: {self.session_state.client_id}")
# Verify if the session_id matches the server
try:
session_info_result = await self.client.call_tool("get_session_info", {})
# Extract content from CallToolResult
if hasattr(session_info_result, 'content') and session_info_result.content:
session_info_text = session_info_result.content[0].text
if isinstance(session_info_text, str):
session_info = json.loads(session_info_text)
else:
session_info = session_info_text
else:
session_info = {}
server_session_id = session_info.get("session_id")
if server_session_id and server_session_id != self.session_state.session_id:
print(f"⚠️ Session ID of the server changed: {self.session_state.session_id}{server_session_id}")
self.session_state.session_id = server_session_id
self._save_session_state()
except Exception as e:
print(f"⚠️ Could not verify session_id: {e}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close connection and save state."""
if self.client:
await self.client.__aexit__(exc_type, exc_val, exc_tb)
self._save_session_state()
def _load_session_state(self):
"""Load session state from disk."""
if self.state_file.exists():
try:
with open(self.state_file, 'r') as f:
data = json.load(f)
# Reconstruct objects
active_tasks = {}
for task_id, task_data in data.get('active_tasks', {}).items():
task_data['started_at'] = datetime.fromisoformat(task_data['started_at'])
task_data['last_update'] = datetime.fromisoformat(task_data['last_update'])
active_tasks[task_id] = TaskProgress(**task_data)
self.session_state = SessionState(
client_id=data.get('client_id', str(uuid.uuid4())[:16]), # Generate if not exists
session_id=data['session_id'],
server_url=data['server_url'],
active_tasks=active_tasks,
completed_tasks=data.get('completed_tasks', []),
created_at=datetime.fromisoformat(data['created_at'])
)
print(f"📂 Session state loaded: {self.session_state.session_id}")
except Exception as e:
print(f"⚠️ Error loading session state: {e}")
self.session_state = None
else:
print("💬 No session state found, starting new session")
self.session_state = None
def _save_session_state(self):
"""Save session state to disk."""
if not self.session_state:
return
try:
# Convert to serializable format
active_tasks = {}
for task_id, task_progress in self.session_state.active_tasks.items():
active_tasks[task_id] = {
'task_id': task_progress.task_id,
'task_name': task_progress.task_name,
'current_step': task_progress.current_step,
'total_steps': task_progress.total_steps,
'last_message': task_progress.last_message,
'started_at': task_progress.started_at.isoformat(),
'last_update': task_progress.last_update.isoformat()
}
data = {
'client_id': self.session_state.client_id,
'session_id': self.session_state.session_id,
'server_url': self.session_state.server_url,
'active_tasks': active_tasks,
'completed_tasks': self.session_state.completed_tasks,
'created_at': self.session_state.created_at.isoformat()
}
with open(self.state_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"⚠️ Error saving session state: {e}")
async def test_connection(self) -> bool:
"""Test connection to the server."""
try:
await self.client.ping()
print(f"✅ Connection established with the resumable server")
return True
except Exception as e:
print(f"❌ Connection error: {e}")
return False
async def call_resumable_tool(
self,
tool_name: str,
parameters: Dict[str, Any],
allow_resume: bool = True
) -> Dict[str, Any]:
"""Call tool with resumability capabilities."""
progress_handler = ResumableProgressHandler(tool_name, self.session_state)
start_time = time.time()
# Add client_id to the parameters if allow_resume is enabled
if allow_resume and self.session_state:
parameters = {**parameters, 'client_id': self.session_state.client_id}
try:
print(f"🚀 Executing {tool_name} (resumption: {'✅' if allow_resume else '❌'})")
result = await self.client.call_tool(
tool_name,
parameters,
progress_handler=progress_handler
)
duration = time.time() - start_time
print(f"✅ {tool_name} completed in {duration:.2f}s")
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except KeyboardInterrupt:
print(f" ⏸️ {tool_name} interrupted by the user")
# In a real implementation, here we would pause the task
raise
except Exception as e:
duration = time.time() - start_time
print(f"❌ {tool_name} failed after {duration:.2f}s: {e}")
raise
async def list_session_tasks(self) -> Dict[str, Any]:
"""List session tasks."""
try:
parameters = {}
if self.session_state:
parameters['client_id'] = self.session_state.client_id
result = await self.client.call_tool("list_session_tasks", parameters)
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error listing session tasks: {e}")
return {"error": str(e)}
async def pause_task(self, task_id: str) -> Dict[str, Any]:
"""Pause a specific task."""
try:
result = await self.client.call_tool("pause_task", {"task_id": task_id})
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error pausing task {task_id}: {e}")
return {"error": str(e)}
async def get_checkpoint_stats(self) -> Dict[str, Any]:
"""Get checkpoint statistics."""
try:
result = await self.client.call_tool("get_checkpoint_stats", {})
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error getting checkpoint statistics: {e}")
return {"error": str(e)}
def show_session_summary(self):
"""Show session summary."""
if not self.session_state:
print("❌ No session state available")
return
print(" " + "="*60)
print("📋 SESSION SUMMARY")
print("="*60)
print(f"🆔 Client ID (persistent): {self.session_state.client_id}")
print(f"📌 Session ID (server): {self.session_state.session_id}")
print(f"🔗 Server: {self.session_state.server_url}")
print(f"📅 Created: {self.session_state.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" 🔄 Active Tasks: {len(self.session_state.active_tasks)}")
for task_id, task in self.session_state.active_tasks.items():
progress = task.progress_percent
print(f" • {task.task_name} ({task_id[:8]}...): {progress:.1f}%")
print(f" └─ {task.last_message}")
print(f" ✅ Completed Tasks: {len(self.session_state.completed_tasks)}")
for task_id in self.session_state.completed_tasks[-5:]: # Last 5
print(f" • {task_id[:8]}...")
async def demo_resumable_data_processing(client: MCPResumableClient):
"""Demo of resumable data processing."""
print(" " + "="*60)
print("📊 DEMO: Resumable Data Processing")
print("="*60)
result = await client.call_resumable_tool(
"resumable_data_processing",
{
"dataset_name": "customer_data",
"total_records": 50,
"batch_size": 5
}
)
if "error" not in result:
print(f"📊 Result: {result.get('total_processed', 0)} records processed")
print(f"🆔 Task ID: {result.get('task_id', 'N/A')}")
async def interactive_demo(client: MCPResumableClient):
"""Interactive demo with options."""
while True:
print(" " + "="*60)
print("🎮 INTERACTIVE DEMO - Resumable Client")
print("="*60)
print("1. Resumable data processing")
print("2. List session tasks")
print("3. Checkpoint statistics")
print("4. Session summary")
print("0. Exit")
print("-" * 60)
try:
choice = input("Select an option (0-4): ").strip()
if choice == "0":
break
elif choice == "1":
await demo_resumable_data_processing(client)
elif choice == "2":
tasks = await client.list_session_tasks()
print(f"📋 Session tasks: {json.dumps(tasks, indent=2)}")
elif choice == "3":
stats = await client.get_checkpoint_stats()
print(f"📊 Checkpoint statistics: {json.dumps(stats, indent=2)}")
elif choice == "4":
client.show_session_summary()
else:
print("❌ Invalid option")
except KeyboardInterrupt:
print(" ⏸️ Demo interrupted")
break
except Exception as e:
print(f"❌ Error: {e}")
async def run_resumable_demo():
"""Run resumable demo."""
print("🌟 Resumable Client MCP - Demo")
print("="*60)
try:
async with MCPResumableClient() as client:
# Test connection
if not await client.test_connection():
print("❌ Could not connect to the resumable server.")
print(" Make sure the server is running on port 8001")
return
# Show session state
client.show_session_summary()
# Run interactive demo
await interactive_demo(client)
print(" 🎉 Demo completed")
except Exception as e:
print(f"❌ Error in the demo: {e}")
if __name__ == "__main__":
try:
asyncio.run(run_resumable_demo())
except KeyboardInterrupt:
print(" ⏹️ Demo interrupted by the user")
except Exception as e:
print(f"❌ Error running demo: {e}")
Copy
	
Overwriting MCP_resumable_client/client.py

Execuçãolink image 39

Vamos iniciar o servidor, iniciar o cliente, solicitar uma tarefa ao servidor, parar o cliente e o servidor para simular que a conexão foi perdida e, em seguida, reiniciar o servidor e o cliente para retomar a tarefa de onde paramos.

Primeira execução - processo interrompidolink image 40

Antes de executar nada, vamos ver o conteúdo das pastas do servidor e do cliente.

Vamos ver o que há na pasta do servidor.

	
!cd MCP_resumable_server && ls -lha
Copy
	
total 336
drwxr-xr-x@ 11 macm1 staff 352B Aug 25 10:26 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.md
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__
-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py
-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml
-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock

Temos os arquivos .python-version, .venv, README.md, __pycache__, main.py, pyproject.toml e uv.lock, que são os arquivos criados pelo uv ao criar o ambiente virtual. E temos os arquivos checkpoint_manager.py e server.py, que são os arquivos que criamos para o servidor MCP resumable.

Agora vamos ver quais arquivos estão na pasta do cliente.

	
!cd MCP_resumable_client && ls -lha
Copy
	
total 336
drwxr-xr-x@ 9 macm1 staff 288B Aug 25 10:26 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md
-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py
-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock

Como antes, temos os arquivos .python-version, .venv, README.md, main.py, pyproject.toml e uv.lock, que são os arquivos criados pelo uv ao criar o ambiente virtual. E temos o arquivo client.py, que é o arquivo que criamos para o cliente MCP resumable.

Depois de ver isso, reiniciamos o servidor.

	
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
Copy
	
🚀 Starting MCP server with resumability at 127.0.0.1:8001
✅ Resumable server ready at http://127.0.0.1:8001/mcp/
🔧 Tools with resumability:
- resumable_data_processing: Processing with checkpoints
- pause_task: Pause tasks
- list_session_tasks: List session tasks
- get_checkpoint_stats: Checkpoint statistics
- get_session_info: Current session information
INFO: Started server process [47049]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)

Agora executamos o cliente, selecionamos a opção 1 (Processamento de dados retomável) e o interrompemos no meio do processo.

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
💬 No session state found, starting new session
🏗️ Creating new session
🔗 Session ID of the server: 6bf64139282e41e0a2233ec92647bf02
🆔 Persistent Client ID: f0af843a-9b0e-42
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): f0af843a-9b0e-42
📌 Session ID (server): 6bf64139282e41e0a2233ec92647bf02
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:34:53
🔄 Active Tasks: 0
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 1
============================================================
📊 DEMO: Resumable Data Processing
============================================================
🚀 Executing resumable_data_processing (resumption: ✅)
[08/25/25 10:35:19] INFO Server log: logging.py:40
🆕 Starting new processing of 50 records
📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C
⏹️ Demo interrupted by the user

Também paramos o servidor.

Agora que paramos o servidor e o cliente, vamos ver novamente o que há em cada pasta.

Primeiro, vamos ver o que há na pasta do servidor.

	
!cd MCP_resumable_server && ls -lha
Copy
	
total 336
drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.md
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__
-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py
drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 checkpoints
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py
-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml
-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock

Podemos ver que, além de tudo o que havia antes, agora há também uma pasta chamada “checkpoints” que contém os pontos de verificação das tarefas que foram executadas.

	
!cd MCP_resumable_server/checkpoints && ls -lha
Copy
	
total 8
drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 .
drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 ..
-rw-r--r--@ 1 macm1 staff 1.1K Aug 25 10:35 checkpoints.json
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 10:35 data

Se observarmos o conteúdo do json, podemos ver

  "current_step": 15,
"total_steps": 50,
	
!cd MCP_resumable_server/checkpoints && cat checkpoints.json
Copy
	
{
"9d7bc504c50e0bce": {
"task_id": "9d7bc504c50e0bce",
"session_id": "864eec3a-be1b-43",
"task_name": "resumable_data_processing",
"parameters": {
"dataset_name": "customer_data",
"total_records": 50,
"batch_size": 5
},
"current_step": 15,
"total_steps": 50,
"status": "failed",
"created_at": "2025-08-25T08:35:19.349142+00:00",
"updated_at": "2025-08-25T08:35:23.366672+00:00",
"data": {
"last_batch": [
{
"record_id": "customer_data_000015",
"processed_at": "step_15",
"value": 88
},
{
"record_id": "customer_data_000016",
"processed_at": "step_16",
"value": 17
},
{
"record_id": "customer_data_000017",
"processed_at": "step_17",
"value": 596
},
{
"record_id": "customer_data_000018",
"processed_at": "step_18",
"value": 693
},
{
"record_id": "customer_data_000019",
"processed_at": "step_19",
"value": 34
}
]
},
"error_message": ""
}
}

Ou seja, ficamos na etapa 15 de 50, que, se você observar a saída do cliente, é onde interrompemos o processo.

📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C

Agora vemos o conteúdo da pasta do cliente e podemos ver que agora existe um arquivo chamado session_state.json.

	
!cd MCP_resumable_client && ls -lha
Copy
	
total 344
drwxr-xr-x@ 10 macm1 staff 320B Aug 25 10:35 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md
-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py
-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml
-rw-r--r--@ 1 macm1 staff 546B Aug 25 10:35 session_state.json
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock

Vamos ver o que tem dentro.

	
!cd MCP_resumable_client && cat session_state.json
Copy
	
{
"client_id": "864eec3a-be1b-43",
"session_id": "d34ff655d6464fb4ae5eeb64c62989b0",
"server_url": "http://localhost:8001/mcp/",
"active_tasks": {
"task_1756110920": {
"task_id": "task_1756110920",
"task_name": "resumable_data_processing",
"current_step": 15,
"total_steps": 50,
"last_message": "Processed 15/50 records",
"started_at": "2025-08-25T10:35:20.353521",
"last_update": "2025-08-25T10:35:22.364507"
}
},
"completed_tasks": [],
"created_at": "2025-08-25T10:35:16.230926"
}

Assim como no json do servidor, vemos

   "current_step": 15,
"total_steps": 50,

Também ficou registrado que ficamos na etapa 15 de 50, mas o mais importante é

      "task_id": "task_1756110920",

Onde guardamos o ID da tarefa para poder solicitar ao servidor que a retome.

Segunda execução - processo retomadolink image 41

Se agora reiniciarmos o servidor

	
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
Copy
	
🚀 Starting MCP server with resumability at 127.0.0.1:8001
✅ Resumable server ready at http://127.0.0.1:8001/mcp/
🔧 Tools with resumability:
- resumable_data_processing: Processing with checkpoints
- pause_task: Pause tasks
- list_session_tasks: List session tasks
- get_checkpoint_stats: Checkpoint statistics
- get_session_info: Current session information
INFO: Started server process [52338]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)

E voltamos a executar o cliente

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
📂 Session state loaded: d34ff655d6464fb4ae5eeb64c62989b0
🆔 Reusing Persistent Client ID: 864eec3a-be1b-43
⚠️ Session ID of the server changed: d34ff655d6464fb4ae5eeb64c62989b0 → 004bc91edde144898938136d0d2b5041
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): 864eec3a-be1b-43
📌 Session ID (server): 004bc91edde144898938136d0d2b5041
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:35:16
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4):

Podemos ver que diz

🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records

Foi possível recuperar as informações que ficaram na etapa 15 de 50 da tarefa com ID 1756110920.

Agora, selecionamos novamente a opção 1 e veremos que continua a partir do passo 15.

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
📂 Session state loaded: 004bc91edde144898938136d0d2b5041
🆔 Reusing Persistent Client ID: 864eec3a-be1b-43
⚠️ Session ID of the server changed: 004bc91edde144898938136d0d2b5041 → 4b93c36446504a3d92467d39a8cba589
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): 864eec3a-be1b-43
📌 Session ID (server): 4b93c36446504a3d92467d39a8cba589
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:35:16
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 1
============================================================
📊 DEMO: Resumable Data Processing
============================================================
🚀 Executing resumable_data_processing (resumption: ✅)
[08/25/25 10:57:31] INFO Server log: 🔄 Resuming logging.py:40
processing from record 15
📊 resumable_data_processing: |████████████░░░░░░░░░░░░░░░░░░| 40.0% (20/50) - Processed 20/50 records [1.0s]
📊 resumable_data_processing: |███████████████░░░░░░░░░░░░░░░| 50.0% (25/50) - Processed 25/50 records [2.0s]
📊 resumable_data_processing: |██████████████████░░░░░░░░░░░░| 60.0% (30/50) - Processed 30/50 records [3.0s]
📊 resumable_data_processing: |█████████████████████░░░░░░░░░| 70.0% (35/50) - Processed 35/50 records [4.0s]
📊 resumable_data_processing: |████████████████████████░░░░░░| 80.0% (40/50) - Processed 40/50 records [5.0s]
📊 resumable_data_processing: |███████████████████████████░░░| 90.0% (45/50) - Processed 45/50 records [6.0s]
📊 resumable_data_processing: |██████████████████████████████| 100.0% (50/50) - Processed 50/50 records [7.0s]
[08/25/25 10:57:38] INFO Server log: ✅ Processing logging.py:40
completed: 55 records
✅ resumable_data_processing completed in 7.05s
📊 Result: 55 records processed
🆔 Task ID: 88e68f3bdf65cd22
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 0
🎉 Demo completed

Vemos que aparece “processando a partir do registro 15”, ou seja, ele começou de onde parou.

Desta forma, já podemos ter servidores e clientes MCP que processam tarefas longas e que podem ser retomadas caso sejam interrompidas.

Isso também é muito útil quando a tarefa é muito longa, pois não é necessário manter o cliente sempre em execução e em espera, mas podemos fazer com que o cliente solicite a tarefa ao servidor e, de tempos em tempos, solicite o status da tarefa.

Continuar lendo

Stream Informações em MCP: Guia Completo para Atualizações de Progresso em Tempo Real com FastMCP

Stream Informações em MCP: Guia Completo para Atualizações de Progresso em Tempo Real com FastMCP

Aprenda como implementar streaming em tempo real em aplicações MCP (Model Context Protocol) usando FastMCP. Este guia abrangente mostra como criar servidores e clientes MCP que suportam atualizações de progresso e informações streaming para tarefas de longa duração. Você construirá ferramentas habilitadas para streaming que fornecem feedback em tempo real durante processamento de dados, upload de arquivos, tarefas de monitoramento e outras operações que demandam muito tempo. Descubra como usar StreamableHttpTransport, implementar manipuladores de progresso com Context e criar barras de progresso visuais que melhoram a experiência do usuário ao trabalhar com aplicações MCP que requerem feedback contínuo.

Últimos posts -->

Você viu esses projetos?

Horeca chatbot

Horeca chatbot Horeca chatbot
Python
LangChain
PostgreSQL
PGVector
React
Kubernetes
Docker
GitHub Actions

Chatbot conversacional para cozinheiros de hotéis e restaurantes. Um cozinheiro, gerente de cozinha ou serviço de quarto de um hotel ou restaurante pode falar com o chatbot para obter informações sobre receitas e menus. Mas também implementa agentes, com os quais pode editar ou criar novas receitas ou menus

Naviground

Naviground Naviground

Subtify

Subtify Subtify
Python
Whisper
Spaces

Gerador de legendas para vídeos no idioma que você desejar. Além disso, coloca uma legenda de cor diferente para cada pessoa

Ver todos os projetos -->

Quer aplicar IA no seu projeto? Entre em contato!

Quer melhorar com essas dicas?

Últimos tips -->

Use isso localmente

Os espaços do Hugging Face nos permitem executar modelos com demos muito simples, mas e se a demo quebrar? Ou se o usuário a deletar? Por isso, criei contêineres docker com alguns espaços interessantes, para poder usá-los localmente, aconteça o que acontecer. Na verdade, se você clicar em qualquer botão de visualização de projeto, ele pode levá-lo a um espaço que não funciona.

Flow edit

Flow edit Flow edit

Edite imagens com este modelo de Flow. Baseado em SD3 ou FLUX, você pode editar qualquer imagem e gerar novas

FLUX.1-RealismLora

FLUX.1-RealismLora FLUX.1-RealismLora
Ver todos os contêineres -->

Quer aplicar IA no seu projeto? Entre em contato!

Você quer treinar seu modelo com esses datasets?

short-jokes-dataset

Dataset com piadas em inglês

opus100

Dataset com traduções de inglês para espanhol

netflix_titles

Dataset com filmes e séries da Netflix

Ver mais datasets -->