MCP Resumable: Cómo Crear Servidores y Clientes con Checkpoints Automáticos

MCP Resumable: Cómo Crear Servidores y Clientes con Checkpoints Automáticos MCP Resumable: Cómo Crear Servidores y Clientes con Checkpoints Automáticos

En el anterior post streamable MCP vimos cómo hacer que el servidor MCP envíe la información del proceso que se está realizando, para que el cliente pueda mostrar al usuario esa información.

Esto es útil cuando el proceso que realiza el servidor MCP es largo.

Pero, ¿qué pasa si dicho proceso se interrumpe? Porque se cae el servidor, se pierde la conexión, etc. Tendríamos que volver a pedirle al servidor MCP que vuelva a empezar el proceso.

Así que para que no pase eso, vamos a explicar cómo crear un servidor y un cliente MCP que puedan continuar con el proceso que se está realizando. Así si se cae por la razón que sea, se puede retomar el proceso desde donde se quedó.

Servidor MCP resumablelink image 1

Este servidor es muy parecido al que hicimos en el post streamable MCP, solo que además creamos checkpoints para poder reanudar un proceso en caso de que se interrumpa.

Así que vamos a ver cómo implementarlo

Implementar servidor MCP resumablelink image 2

Crear entorno virtuallink image 3

Primero creamos la carpeta donde lo vamos a desarrollar

	
!mkdir MCP_resumable_server
Copy

Creamos el entorno con uv

	
!cd MCP_resumable_server && uv init .
Copy
	
Initialized project `mcp-resumable-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_server`

Lo iniciamos

	
!cd MCP_resumable_server && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos las librerías necesarias

	
!cd MCP_resumable_server && uv add fastmcp uvicorn
Copy
	
Resolved 64 packages in 548ms
⠙ Preparing packages... (0/1) ⠋ Preparing packages... (0/0)
⠙ Preparing packages... (0/1)-------------- 0 B/71.28 KiB
⠙ Preparing packages... (0/1)-------------- 16.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)-------------- 32.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)---------- 48.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)---------- 64.00 KiB/71.28 KiB
Prepared 1 package in 134ms
Installed 61 packages in 152ms
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.2
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.22.5
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.2.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.0
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.14.1
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código del servidorlink image 4

Ahora escribimos el código necesario para el servidor MCP resumable.

Checkpoint managerlink image 5

Como hemos dicho, la mayor diferencia con el servidor del post anterior streamable MCP es que en este vamos a guardar el estado en checkpoints para poder reanudar el proceso en caso de que se interrumpa. Así que vamos a crear el gestor de checkpoints.

	
%%writefile MCP_resumable_server/checkpoint_manager.py
#!/usr/bin/env python3
"""
Checkpoint and resumability system for MCP streaming tasks.
Allows saving the state of long tasks and resuming them from where they were interrupted.
"""
import json
import pickle
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class TaskStatus(Enum):
"""Task states."""
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskCheckpoint:
"""Represents a checkpoint of a task."""
task_id: str
session_id: str
task_name: str
parameters: Dict[str, Any]
current_step: int
total_steps: int
status: TaskStatus
created_at: datetime
updated_at: datetime
data: Dict[str, Any] # Estado específico de la tarea
error_message: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to serializable dictionary."""
data = asdict(self)
data['status'] = data['status'].value
data['created_at'] = data['created_at'].isoformat()
data['updated_at'] = data['updated_at'].isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TaskCheckpoint':
"""Create from dictionary."""
data['status'] = TaskStatus(data['status'])
data['created_at'] = datetime.fromisoformat(data['created_at'])
data['updated_at'] = datetime.fromisoformat(data['updated_at'])
return cls(**data)
class CheckpointManager:
"""Checkpoint manager for streaming tasks."""
def __init__(self, storage_dir: str = "checkpoints"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.checkpoints_file = self.storage_dir / "checkpoints.json"
self.data_dir = self.storage_dir / "data"
self.data_dir.mkdir(exist_ok=True)
self._checkpoints: Dict[str, TaskCheckpoint] = {}
self._load_checkpoints()
def _load_checkpoints(self):
"""Load checkpoints from disk."""
if self.checkpoints_file.exists():
try:
with open(self.checkpoints_file, 'r') as f:
data = json.load(f)
self._checkpoints = {
task_id: TaskCheckpoint.from_dict(checkpoint_data)
for task_id, checkpoint_data in data.items()
}
except Exception as e:
print(f"❌ Error loading checkpoints: {e}")
self._checkpoints = {}
def _save_checkpoints(self):
"""Save checkpoints to disk."""
try:
data = {
task_id: checkpoint.to_dict()
for task_id, checkpoint in self._checkpoints.items()
}
with open(self.checkpoints_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"❌ Error saving checkpoints: {e}")
def generate_task_id(self, session_id: str, task_name: str, parameters: Dict[str, Any]) -> str:
"""Generate unique ID for a task."""
# Create hash based on session, task and parameters
data = f"{session_id}-{task_name}-{json.dumps(parameters, sort_keys=True)}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def create_checkpoint(
self,
session_id: str,
task_name: str,
parameters: Dict[str, Any],
total_steps: int,
initial_data: Optional[Dict[str, Any]] = None
) -> str:
"""Create new checkpoint."""
task_id = self.generate_task_id(session_id, task_name, parameters)
checkpoint = TaskCheckpoint(
task_id=task_id,
session_id=session_id,
task_name=task_name,
parameters=parameters,
current_step=0,
total_steps=total_steps,
status=TaskStatus.PENDING,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
data=initial_data or {}
)
self._checkpoints[task_id] = checkpoint
self._save_checkpoints()
return task_id
def update_checkpoint(
self,
task_id: str,
current_step: int,
status: TaskStatus,
data: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None
) -> bool:
"""Update existing checkpoint."""
if task_id not in self._checkpoints:
return False
checkpoint = self._checkpoints[task_id]
checkpoint.current_step = current_step
checkpoint.status = status
checkpoint.updated_at = datetime.now(timezone.utc)
if data is not None:
checkpoint.data.update(data)
if error_message is not None:
checkpoint.error_message = error_message
self._save_checkpoints()
return True
def get_checkpoint(self, task_id: str) -> Optional[TaskCheckpoint]:
"""Get checkpoint by ID."""
return self._checkpoints.get(task_id)
def find_resumable_task(
self,
session_id: str,
task_name: str,
parameters: Dict[str, Any]
) -> Optional[TaskCheckpoint]:
"""Find resumable task with the same parameters."""
task_id = self.generate_task_id(session_id, task_name, parameters)
checkpoint = self._checkpoints.get(task_id)
if checkpoint and checkpoint.status in [TaskStatus.PAUSED, TaskStatus.RUNNING]:
return checkpoint
return None
def get_session_checkpoints(self, session_id: str) -> List[TaskCheckpoint]:
"""Get all checkpoints of a session."""
return [
checkpoint for checkpoint in self._checkpoints.values()
if checkpoint.session_id == session_id
]
def save_task_data(self, task_id: str, data: Any) -> bool:
"""Save specific task data."""
try:
data_file = self.data_dir / f"{task_id}.pkl"
with open(data_file, 'wb') as f:
pickle.dump(data, f)
return True
except Exception as e:
print(f"❌ Error saving task data {task_id}: {e}")
return False
def load_task_data(self, task_id: str) -> Any:
"""Load specific task data."""
try:
data_file = self.data_dir / f"{task_id}.pkl"
if data_file.exists():
with open(data_file, 'rb') as f:
return pickle.load(f)
return None
except Exception as e:
print(f"❌ Error loading task data {task_id}: {e}")
return None
def delete_checkpoint(self, task_id: str) -> bool:
"""Delete checkpoint and associated data."""
try:
# Delete checkpoint
if task_id in self._checkpoints:
del self._checkpoints[task_id]
self._save_checkpoints()
# Delete data
data_file = self.data_dir / f"{task_id}.pkl"
if data_file.exists():
data_file.unlink()
return True
except Exception as e:
print(f"❌ Error deleting checkpoint {task_id}: {e}")
return False
def cleanup_old_checkpoints(self, max_age_days: int = 7) -> int:
"""Clean up old checkpoints."""
cutoff_date = datetime.now(timezone.utc).replace(
day=datetime.now(timezone.utc).day - max_age_days
)
to_delete = []
for task_id, checkpoint in self._checkpoints.items():
if (checkpoint.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]
and checkpoint.updated_at < cutoff_date):
to_delete.append(task_id)
for task_id in to_delete:
self.delete_checkpoint(task_id)
return len(to_delete)
def get_stats(self) -> Dict[str, int]:
"""Get checkpoint statistics."""
stats = {status.value: 0 for status in TaskStatus}
for checkpoint in self._checkpoints.values():
stats[checkpoint.status.value] += 1
return stats
# Singleton global for checkpoint manager
checkpoint_manager = CheckpointManager()
Copy
	
Writing MCP_resumable_server/checkpoint_manager.py
Servidorlink image 6

Ahora creamos el código del servidor

	
%%writefile MCP_resumable_server/server.py
#!/usr/bin/env python3
"""
Server MCP with resumability and checkpoints.
Demo how to implement tasks that can be paused and resumed.
"""
import asyncio
import uvicorn
import random
from typing import Dict, List, Optional, Any
from fastmcp import FastMCP, Context
from fastmcp.server.http import create_streamable_http_app
from checkpoint_manager import checkpoint_manager, TaskStatus, TaskCheckpoint
# Create instance of the MCP server
mcp = FastMCP(
name="Resumable Server",
instructions="MCP server with resumability and checkpoints for long tasks"
)
@mcp.tool
async def resumable_data_processing(
dataset_name: str = "default_dataset",
total_records: int = 100,
batch_size: int = 10,
client_id: str = None, # Add persistent client_id
context: Context = None
) -> Dict[str, Any]:
"""
Resumable data processing with automatic checkpoints.
Args:
dataset_name: Name of the dataset to process
total_records: Total records to process
batch_size: Size of the batch per iteration
client_id: Persistent client id for resumability
"""
if not context:
return {"error": "Requires context for resumability"}
# Use persistent client id if available, otherwise use session_id
persistent_id = client_id if client_id else context.session_id
if not persistent_id:
return {"error": "Requires client_id or session_id for resumability"}
task_name = "resumable_data_processing"
parameters = {
"dataset_name": dataset_name,
"total_records": total_records,
"batch_size": batch_size
}
# Search for existing task to resume - include interrupted tasks
task_id = checkpoint_manager.generate_task_id(persistent_id, task_name, parameters)
existing_checkpoint = checkpoint_manager.get_checkpoint(task_id)
if existing_checkpoint and existing_checkpoint.current_step > 0:
# Resume from checkpoint
await context.info(f"🔄 Resuming processing from record {existing_checkpoint.current_step}")
start_record = existing_checkpoint.current_step
processed_data = checkpoint_manager.load_task_data(task_id) or []
# Mark as running
checkpoint_manager.update_checkpoint(
task_id, start_record, TaskStatus.RUNNING
)
else:
# New task
await context.info(f"🆕 Starting new processing of {total_records} records")
if not existing_checkpoint:
task_id = checkpoint_manager.create_checkpoint(
persistent_id, task_name, parameters, total_records
)
start_record = 0
processed_data = []
# Mark as running
checkpoint_manager.update_checkpoint(
task_id, 0, TaskStatus.RUNNING
)
try:
# Process in batches
for record_num in range(start_record, total_records, batch_size):
batch_end = min(record_num + batch_size, total_records)
# Simulate batch processing
await asyncio.sleep(1) # Simulate work
# Process batch
batch_results = []
for i in range(record_num, batch_end):
# Simulate record processing
result = {
"record_id": f"{dataset_name}_{i:06d}",
"processed_at": f"step_{i}",
"value": random.randint(1, 1000)
}
batch_results.append(result)
processed_data.extend(batch_results)
# Save checkpoint for each batch
checkpoint_manager.update_checkpoint(
task_id,
batch_end,
TaskStatus.RUNNING,
{"last_batch": batch_results}
)
# Save processed data
checkpoint_manager.save_task_data(task_id, processed_data)
# Report progress
await context.report_progress(
progress=batch_end,
total=total_records,
message=f"Processed {batch_end}/{total_records} records"
)
await context.debug(f"Batch {record_num}-{batch_end-1} completed")
# Mark as completed
checkpoint_manager.update_checkpoint(
task_id, total_records, TaskStatus.COMPLETED
)
await context.info(f"✅ Processing completed: {len(processed_data)} records")
return {
"task_id": task_id,
"dataset_name": dataset_name,
"total_processed": len(processed_data),
"records": processed_data[:5], # Show first 5
"status": "completed"
}
except Exception as e:
# Mark as failed
checkpoint_manager.update_checkpoint(
task_id, record_num, TaskStatus.FAILED, error_message=str(e)
)
await context.error(f"❌ Error in processing: {e}")
raise
@mcp.tool
async def pause_task(
task_id: str,
context: Context = None
) -> Dict[str, str]:
"""
Pause a task in execution.
Args:
task_id: ID of the task to pause
"""
checkpoint = checkpoint_manager.get_checkpoint(task_id)
if not checkpoint:
return {"error": f"Task {task_id} not found"}
if checkpoint.status != TaskStatus.RUNNING:
return {"error": f"Task {task_id} is not running"}
success = checkpoint_manager.update_checkpoint(
task_id, checkpoint.current_step, TaskStatus.PAUSED
)
if success:
if context:
await context.info(f"⏸️ Task {task_id} paused")
return {"message": f"Task {task_id} paused correctly"}
else:
return {"error": f"Could not pause task {task_id}"}
@mcp.tool
async def list_session_tasks(
client_id: str = None, # Add persistent client_id
context: Context = None
) -> Dict[str, Any]:
"""
List all tasks of the current session or the specified client.
Args:
client_id: Persistent client id (optional)
"""
if not context:
return {"error": "Requires context"}
# Use persistent client id if available, otherwise use session_id
persistent_id = client_id if client_id else context.session_id
if not persistent_id:
return {"error": "Requires client_id or session_id"}
checkpoints = checkpoint_manager.get_session_checkpoints(persistent_id)
tasks = []
for checkpoint in checkpoints:
task_info = {
"task_id": checkpoint.task_id,
"task_name": checkpoint.task_name,
"status": checkpoint.status.value,
"progress": f"{checkpoint.current_step}/{checkpoint.total_steps}",
"progress_percent": round((checkpoint.current_step / checkpoint.total_steps) * 100, 1) if checkpoint.total_steps > 0 else 0,
"created_at": checkpoint.created_at.isoformat(),
"updated_at": checkpoint.updated_at.isoformat()
}
tasks.append(task_info)
return {
"session_id": context.session_id,
"persistent_id": persistent_id,
"total_tasks": len(tasks),
"tasks": tasks
}
@mcp.tool
async def get_checkpoint_stats(context: Context = None) -> Dict[str, Any]:
"""
Get global checkpoint statistics.
"""
stats = checkpoint_manager.get_stats()
return {
"checkpoint_stats": stats,
"total_checkpoints": sum(stats.values())
}
@mcp.tool
async def get_session_info(context: Context = None) -> Dict[str, Any]:
"""
Get information about the current session.
"""
if not context:
return {"error": "No context available"}
return {
"session_id": context.session_id,
"request_id": context.request_id,
"client_id": context.client_id
}
async def run_resumable_server(host: str = "127.0.0.1", port: int = 8001):
"""Run server with resumability capabilities."""
print(f"🚀 Starting MCP server with resumability at {host}:{port}")
# Create application
app = create_streamable_http_app(
server=mcp,
streamable_http_path="/mcp/",
stateless_http=False, # IMPORTANT: stateful for sessions
debug=True
)
config = uvicorn.Config(
app=app,
host=host,
port=port,
log_level="info",
access_log=False
)
server = uvicorn.Server(config)
print(f"✅ Resumable server ready at http://{host}:{port}/mcp/")
print("🔧 Tools with resumability:")
print(" - resumable_data_processing: Processing with checkpoints")
# print(" - large_file_download: Download resumable")
# print(" - machine_learning_training: ML training resumable")
print(" - pause_task: Pause tasks")
print(" - list_session_tasks: List session tasks")
print(" - get_checkpoint_stats: Checkpoint statistics")
print(" - get_session_info: Current session information")
await server.serve()
if __name__ == "__main__":
try:
asyncio.run(run_resumable_server())
except KeyboardInterrupt:
print(" ⏹️ Server stopped by user")
except Exception as e:
print(f"❌ Error running server: {e}")
Copy
	
Writing MCP_resumable_server/server.py

Cliente MCP resumablelink image 7

Aquí también vamos a tener un cliente muy parecido al del post anterior streamable MCP, pero vamos a añadir un json con la información de las sesiones creadas. Para que si una sesión se interrumpe en medio de un proceso, se pueda continuar desde donde se dejó.

Implementar cliente MCP resumablelink image 8

Crear el entorno virtual para el clientelink image 9

Primero creamos la carpeta donde lo vamos a desarrollar

	
!mkdir MCP_resumable_client
Copy

Creamos el entorno con uv

	
!cd MCP_resumable_client && uv init .
Copy
	
Initialized project `mcp-resumable-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_client`

Lo iniciamos

	
!cd MCP_resumable_client && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos las librerías necesarias

	
!cd MCP_resumable_client && uv add fastmcp
Copy
	
Resolved 64 packages in 314ms
Installed 61 packages in 145ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.2
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.22.5
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.2.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.0
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.14.1
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código del clientelink image 10

Ahora creamos el cliente MCP. También va a ser muy parecido al del post anterior (es la última vez que digo esto, lo juro) streamable MCP, solo que además guardamos las sesiones en un json para poder recuperar las que se han quedado a medias en un proceso del servidor

	
%%writefile MCP_resumable_client/client.py
#!/usr/bin/env python3
"""
Client MCP with resumability and session persistence capabilities.
Demonstrates how to connect to a resumable server and handle interruptions.
"""
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
@dataclass
class TaskProgress:
"""Represents the progress of a task."""
task_id: str
task_name: str
current_step: int
total_steps: int
last_message: str
started_at: datetime = field(default_factory=datetime.now)
last_update: datetime = field(default_factory=datetime.now)
@property
def progress_percent(self) -> float:
"""Calculate progress percentage."""
if self.total_steps == 0:
return 0.0
return (self.current_step / self.total_steps) * 100
@dataclass
class SessionState:
"""Session state of the client."""
client_id: str # Persistent client ID
session_id: str # Session ID of the server (can change)
server_url: str
active_tasks: Dict[str, TaskProgress]
completed_tasks: List[str]
created_at: datetime = field(default_factory=datetime.now)
class ResumableProgressHandler:
"""Handles progress with resumability capabilities."""
def __init__(self, task_name: str, session_state: SessionState):
self.task_name = task_name
self.session_state = session_state
self.start_time = time.time()
self.last_progress = 0
self.task_progress: Optional[TaskProgress] = None
def __call__(self, progress: float, total: float, message: str):
"""Callback for progress updates."""
# Create or update task progress
if not self.task_progress:
# Try to find task_id from the message
task_id = self._extract_task_id(message)
if not task_id:
task_id = f"task_{int(time.time())}"
self.task_progress = TaskProgress(
task_id=task_id,
task_name=self.task_name,
current_step=int(progress),
total_steps=int(total),
last_message=message
)
self.session_state.active_tasks[task_id] = self.task_progress
else:
self.task_progress.current_step = int(progress)
self.task_progress.total_steps = int(total)
self.task_progress.last_message = message
self.task_progress.last_update = datetime.now()
# Display progress visually
self._display_progress(progress, total, message)
# Save session state
self._save_session_state()
def _extract_task_id(self, message: str) -> Optional[str]:
"""Try to extract task_id from the message."""
# Search for patterns like "task_id: abc123" in the message
import re
match = re.search(r'task[_s]*id[:s]*([a-f0-9]{16})', message.lower())
return match.group(1) if match else None
def _display_progress(self, progress: float, total: float, message: str):
"""Display progress visually."""
percentage = (progress / total) * 100 if total > 0 else 0
bar_length = 30
filled_length = int(bar_length * percentage / 100)
bar = '█' * filled_length + '░' * (bar_length - filled_length)
elapsed = time.time() - self.start_time
print(f" 📊 {self.task_name}: |{bar}| {percentage:.1f}% "
f"({progress:.0f}/{total:.0f}) - {message} [{elapsed:.1f}s]",
end='', flush=True)
if progress >= total:
print() # New line when completed
if self.task_progress:
# Move to completed
task_id = self.task_progress.task_id
self.session_state.completed_tasks.append(task_id)
if task_id in self.session_state.active_tasks:
del self.session_state.active_tasks[task_id]
self._save_session_state()
def _save_session_state(self):
"""Save session state."""
# In a real case, this would be saved to disk
pass
class MCPResumableClient:
"""Client MCP with resumability capabilities."""
def __init__(self, server_url: str = "http://localhost:8001/mcp/"):
self.server_url = server_url
self.transport = None
self.client = None
self.session_state: Optional[SessionState] = None
self.state_file = Path("session_state.json")
async def __aenter__(self):
"""Initialize connection and load state."""
# Load previous state if it exists
self._load_session_state()
self.transport = StreamableHttpTransport(
url=self.server_url,
sse_read_timeout=120.0 # Timeout for resumable tasks
)
self.client = Client(transport=self.transport)
await self.client.__aenter__()
# Create new session state if it doesn't exist
if not self.session_state:
# Generate unique persistent client ID
client_id = str(uuid.uuid4())[:16] # Use only the first 16 characters
# Get real session_id from the server
try:
session_info_result = await self.client.call_tool("get_session_info", {})
# Extract content from CallToolResult
if hasattr(session_info_result, 'content') and session_info_result.content:
session_info_text = session_info_result.content[0].text
if isinstance(session_info_text, str):
session_info = json.loads(session_info_text)
else:
session_info = session_info_text
else:
session_info = {}
real_session_id = session_info.get("session_id", f"session_{int(time.time())}")
print("🏗️ Creating new session")
print(f"🔗 Session ID of the server: {real_session_id}")
print(f"🆔 Persistent Client ID: {client_id}")
except Exception as e:
print(f"⚠️ Could not get session_id from the server: {e}")
real_session_id = f"session_{int(time.time())}"
self.session_state = SessionState(
client_id=client_id,
session_id=real_session_id,
server_url=self.server_url,
active_tasks={},
completed_tasks=[]
)
self._save_session_state()
else:
# Reuse existing client_id
print(f"🆔 Reusing Persistent Client ID: {self.session_state.client_id}")
# Verify if the session_id matches the server
try:
session_info_result = await self.client.call_tool("get_session_info", {})
# Extract content from CallToolResult
if hasattr(session_info_result, 'content') and session_info_result.content:
session_info_text = session_info_result.content[0].text
if isinstance(session_info_text, str):
session_info = json.loads(session_info_text)
else:
session_info = session_info_text
else:
session_info = {}
server_session_id = session_info.get("session_id")
if server_session_id and server_session_id != self.session_state.session_id:
print(f"⚠️ Session ID of the server changed: {self.session_state.session_id}{server_session_id}")
self.session_state.session_id = server_session_id
self._save_session_state()
except Exception as e:
print(f"⚠️ Could not verify session_id: {e}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close connection and save state."""
if self.client:
await self.client.__aexit__(exc_type, exc_val, exc_tb)
self._save_session_state()
def _load_session_state(self):
"""Load session state from disk."""
if self.state_file.exists():
try:
with open(self.state_file, 'r') as f:
data = json.load(f)
# Reconstruct objects
active_tasks = {}
for task_id, task_data in data.get('active_tasks', {}).items():
task_data['started_at'] = datetime.fromisoformat(task_data['started_at'])
task_data['last_update'] = datetime.fromisoformat(task_data['last_update'])
active_tasks[task_id] = TaskProgress(**task_data)
self.session_state = SessionState(
client_id=data.get('client_id', str(uuid.uuid4())[:16]), # Generate if not exists
session_id=data['session_id'],
server_url=data['server_url'],
active_tasks=active_tasks,
completed_tasks=data.get('completed_tasks', []),
created_at=datetime.fromisoformat(data['created_at'])
)
print(f"📂 Session state loaded: {self.session_state.session_id}")
except Exception as e:
print(f"⚠️ Error loading session state: {e}")
self.session_state = None
else:
print("💬 No session state found, starting new session")
self.session_state = None
def _save_session_state(self):
"""Save session state to disk."""
if not self.session_state:
return
try:
# Convert to serializable format
active_tasks = {}
for task_id, task_progress in self.session_state.active_tasks.items():
active_tasks[task_id] = {
'task_id': task_progress.task_id,
'task_name': task_progress.task_name,
'current_step': task_progress.current_step,
'total_steps': task_progress.total_steps,
'last_message': task_progress.last_message,
'started_at': task_progress.started_at.isoformat(),
'last_update': task_progress.last_update.isoformat()
}
data = {
'client_id': self.session_state.client_id,
'session_id': self.session_state.session_id,
'server_url': self.session_state.server_url,
'active_tasks': active_tasks,
'completed_tasks': self.session_state.completed_tasks,
'created_at': self.session_state.created_at.isoformat()
}
with open(self.state_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"⚠️ Error saving session state: {e}")
async def test_connection(self) -> bool:
"""Test connection to the server."""
try:
await self.client.ping()
print(f"✅ Connection established with the resumable server")
return True
except Exception as e:
print(f"❌ Connection error: {e}")
return False
async def call_resumable_tool(
self,
tool_name: str,
parameters: Dict[str, Any],
allow_resume: bool = True
) -> Dict[str, Any]:
"""Call tool with resumability capabilities."""
progress_handler = ResumableProgressHandler(tool_name, self.session_state)
start_time = time.time()
# Add client_id to the parameters if allow_resume is enabled
if allow_resume and self.session_state:
parameters = {**parameters, 'client_id': self.session_state.client_id}
try:
print(f"🚀 Executing {tool_name} (resumption: {'✅' if allow_resume else '❌'})")
result = await self.client.call_tool(
tool_name,
parameters,
progress_handler=progress_handler
)
duration = time.time() - start_time
print(f"✅ {tool_name} completed in {duration:.2f}s")
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except KeyboardInterrupt:
print(f" ⏸️ {tool_name} interrupted by the user")
# In a real implementation, here we would pause the task
raise
except Exception as e:
duration = time.time() - start_time
print(f"❌ {tool_name} failed after {duration:.2f}s: {e}")
raise
async def list_session_tasks(self) -> Dict[str, Any]:
"""List session tasks."""
try:
parameters = {}
if self.session_state:
parameters['client_id'] = self.session_state.client_id
result = await self.client.call_tool("list_session_tasks", parameters)
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error listing session tasks: {e}")
return {"error": str(e)}
async def pause_task(self, task_id: str) -> Dict[str, Any]:
"""Pause a specific task."""
try:
result = await self.client.call_tool("pause_task", {"task_id": task_id})
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error pausing task {task_id}: {e}")
return {"error": str(e)}
async def get_checkpoint_stats(self) -> Dict[str, Any]:
"""Get checkpoint statistics."""
try:
result = await self.client.call_tool("get_checkpoint_stats", {})
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error getting checkpoint statistics: {e}")
return {"error": str(e)}
def show_session_summary(self):
"""Show session summary."""
if not self.session_state:
print("❌ No session state available")
return
print(" " + "="*60)
print("📋 SESSION SUMMARY")
print("="*60)
print(f"🆔 Client ID (persistent): {self.session_state.client_id}")
print(f"📌 Session ID (server): {self.session_state.session_id}")
print(f"🔗 Server: {self.session_state.server_url}")
print(f"📅 Created: {self.session_state.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" 🔄 Active Tasks: {len(self.session_state.active_tasks)}")
for task_id, task in self.session_state.active_tasks.items():
progress = task.progress_percent
print(f" • {task.task_name} ({task_id[:8]}...): {progress:.1f}%")
print(f" └─ {task.last_message}")
print(f" ✅ Completed Tasks: {len(self.session_state.completed_tasks)}")
for task_id in self.session_state.completed_tasks[-5:]: # Last 5
print(f" • {task_id[:8]}...")
async def demo_resumable_data_processing(client: MCPResumableClient):
"""Demo of resumable data processing."""
print(" " + "="*60)
print("📊 DEMO: Resumable Data Processing")
print("="*60)
result = await client.call_resumable_tool(
"resumable_data_processing",
{
"dataset_name": "customer_data",
"total_records": 50,
"batch_size": 5
}
)
if "error" not in result:
print(f"📊 Result: {result.get('total_processed', 0)} records processed")
print(f"🆔 Task ID: {result.get('task_id', 'N/A')}")
async def interactive_demo(client: MCPResumableClient):
"""Interactive demo with options."""
while True:
print(" " + "="*60)
print("🎮 INTERACTIVE DEMO - Resumable Client")
print("="*60)
print("1. Resumable data processing")
print("2. List session tasks")
print("3. Checkpoint statistics")
print("4. Session summary")
print("0. Exit")
print("-" * 60)
try:
choice = input("Select an option (0-4): ").strip()
if choice == "0":
break
elif choice == "1":
await demo_resumable_data_processing(client)
elif choice == "2":
tasks = await client.list_session_tasks()
print(f"📋 Session tasks: {json.dumps(tasks, indent=2)}")
elif choice == "3":
stats = await client.get_checkpoint_stats()
print(f"📊 Checkpoint statistics: {json.dumps(stats, indent=2)}")
elif choice == "4":
client.show_session_summary()
else:
print("❌ Invalid option")
except KeyboardInterrupt:
print(" ⏸️ Demo interrupted")
break
except Exception as e:
print(f"❌ Error: {e}")
async def run_resumable_demo():
"""Run resumable demo."""
print("🌟 Resumable Client MCP - Demo")
print("="*60)
try:
async with MCPResumableClient() as client:
# Test connection
if not await client.test_connection():
print("❌ Could not connect to the resumable server.")
print(" Make sure the server is running on port 8001")
return
# Show session state
client.show_session_summary()
# Run interactive demo
await interactive_demo(client)
print(" 🎉 Demo completed")
except Exception as e:
print(f"❌ Error in the demo: {e}")
if __name__ == "__main__":
try:
asyncio.run(run_resumable_demo())
except KeyboardInterrupt:
print(" ⏹️ Demo interrupted by the user")
except Exception as e:
print(f"❌ Error running demo: {e}")
Copy
	
Overwriting MCP_resumable_client/client.py

Ejecuciónlink image 11

Vamos a levantar el servidor, iniciar el cliente, pedirle al servidor una tarea, vamos a parar el cliente y el servidor para simular que la conexión se ha perdido y luego vamos a volver a iniciar el servidor y el cliente para retomar la tarea dónde la dejamos

Primera ejecución - proceso interrumpidolink image 12

Antes de ejecutar nada vamos a ver el contenido de las carpetas del servidor y del cliente

Vemos qué hay en la carpeta del servidor

	
!cd MCP_resumable_server && ls -lha
Copy
	
total 336
drwxr-xr-x@ 11 macm1 staff 352B Aug 25 10:26 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.md
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__
-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py
-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml
-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock

Tenemos los archivos .python-version, .venv, README.md, __pycache__, main.py, pyproject.toml y uv.lock que son los archivos que ha creado uv al crear el entorno virtual. Y tenemos los archivos checkpoint_manager.py y server.py que son los archivos que hemos creado para el servidor MCP resumable.

Ahora vemos qué archivos hay en la carpeta del cliente

	
!cd MCP_resumable_client && ls -lha
Copy
	
total 336
drwxr-xr-x@ 9 macm1 staff 288B Aug 25 10:26 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md
-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py
-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock

Igual que antes, tenemos los archivos .python-version, .venv, README.md, main.py, pyproject.toml y uv.lock que son los archivos que ha creado uv al crear el entorno virtual. Y tenemos el archivo client.py que es el archivo que hemos creado para el cliente MCP resumable.

Una vez visto esto, levantamos el servidor

	
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
Copy
	
🚀 Starting MCP server with resumability at 127.0.0.1:8001
✅ Resumable server ready at http://127.0.0.1:8001/mcp/
🔧 Tools with resumability:
- resumable_data_processing: Processing with checkpoints
- pause_task: Pause tasks
- list_session_tasks: List session tasks
- get_checkpoint_stats: Checkpoint statistics
- get_session_info: Current session information
INFO: Started server process [47049]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)

Ahora ejecutamos el cliente, seleccionamos la opción 1 (Resumable data processing) y lo paramos a medias

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
💬 No session state found, starting new session
🏗️ Creating new session
🔗 Session ID of the server: 6bf64139282e41e0a2233ec92647bf02
🆔 Persistent Client ID: f0af843a-9b0e-42
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): f0af843a-9b0e-42
📌 Session ID (server): 6bf64139282e41e0a2233ec92647bf02
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:34:53
🔄 Active Tasks: 0
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 1
============================================================
📊 DEMO: Resumable Data Processing
============================================================
🚀 Executing resumable_data_processing (resumption: ✅)
[08/25/25 10:35:19] INFO Server log: logging.py:40
🆕 Starting new processing of 50 records
📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C
⏹️ Demo interrupted by the user

Paramos también el servidor

Ahora que hemos parado el servidor y el cliente, volvemos a ver qué hay en cada carpeta

Primero vemos qué hay en la carpeta del servidor

	
!cd MCP_resumable_server && ls -lha
Copy
	
total 336
drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.md
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__
-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py
drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 checkpoints
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py
-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml
-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock

Podemos ver que además de todo lo que había antes, ahora hay tambén una carpet llamada checkpoints que contiene los checkpoints de las tareas que se han ejecutado.

	
!cd MCP_resumable_server/checkpoints && ls -lha
Copy
	
total 8
drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 .
drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 ..
-rw-r--r--@ 1 macm1 staff 1.1K Aug 25 10:35 checkpoints.json
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 10:35 data

Si vemos el contenido del json podemos ver

  "current_step": 15,
"total_steps": 50,
	
!cd MCP_resumable_server/checkpoints && cat checkpoints.json
Copy
	
{
"9d7bc504c50e0bce": {
"task_id": "9d7bc504c50e0bce",
"session_id": "864eec3a-be1b-43",
"task_name": "resumable_data_processing",
"parameters": {
"dataset_name": "customer_data",
"total_records": 50,
"batch_size": 5
},
"current_step": 15,
"total_steps": 50,
"status": "failed",
"created_at": "2025-08-25T08:35:19.349142+00:00",
"updated_at": "2025-08-25T08:35:23.366672+00:00",
"data": {
"last_batch": [
{
"record_id": "customer_data_000015",
"processed_at": "step_15",
"value": 88
},
{
"record_id": "customer_data_000016",
"processed_at": "step_16",
"value": 17
},
{
"record_id": "customer_data_000017",
"processed_at": "step_17",
"value": 596
},
{
"record_id": "customer_data_000018",
"processed_at": "step_18",
"value": 693
},
{
"record_id": "customer_data_000019",
"processed_at": "step_19",
"value": 34
}
]
},
"error_message": ""
}
}

Es decir, nos hemos quedado en el paso 15 de 50, que si te fijas en la salida del cliente, es dónde hemos interrumpido el proceso.

📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C

Ahora vemos el contenido de la carpeta del cliente, podemos ver que ahora hay un archivo llamado session_state.json

	
!cd MCP_resumable_client && ls -lha
Copy
	
total 344
drwxr-xr-x@ 10 macm1 staff 320B Aug 25 10:35 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md
-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py
-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml
-rw-r--r--@ 1 macm1 staff 546B Aug 25 10:35 session_state.json
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock

Vamos a ver qué tiene dentro

	
!cd MCP_resumable_client && cat session_state.json
Copy
	
{
"client_id": "864eec3a-be1b-43",
"session_id": "d34ff655d6464fb4ae5eeb64c62989b0",
"server_url": "http://localhost:8001/mcp/",
"active_tasks": {
"task_1756110920": {
"task_id": "task_1756110920",
"task_name": "resumable_data_processing",
"current_step": 15,
"total_steps": 50,
"last_message": "Processed 15/50 records",
"started_at": "2025-08-25T10:35:20.353521",
"last_update": "2025-08-25T10:35:22.364507"
}
},
"completed_tasks": [],
"created_at": "2025-08-25T10:35:16.230926"
}

Al igual que en el json del servidor vemos

   "current_step": 15,
"total_steps": 50,

También seha quedado guardado que nos hemos quedado en el paso 15 de 50, pero lo más importante es

      "task_id": "task_1756110920",

En donde guardamos el ID de la tarea para poder pedirle al servidor que la retome

Segunda ejecución - proceso reanudadolink image 13

Si ahora volvemos a levantar el servidor

	
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
Copy
	
🚀 Starting MCP server with resumability at 127.0.0.1:8001
✅ Resumable server ready at http://127.0.0.1:8001/mcp/
🔧 Tools with resumability:
- resumable_data_processing: Processing with checkpoints
- pause_task: Pause tasks
- list_session_tasks: List session tasks
- get_checkpoint_stats: Checkpoint statistics
- get_session_info: Current session information
INFO: Started server process [52338]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)

Y volvemos a ejecutar el cliente

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
📂 Session state loaded: d34ff655d6464fb4ae5eeb64c62989b0
🆔 Reusing Persistent Client ID: 864eec3a-be1b-43
⚠️ Session ID of the server changed: d34ff655d6464fb4ae5eeb64c62989b0 → 004bc91edde144898938136d0d2b5041
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): 864eec3a-be1b-43
📌 Session ID (server): 004bc91edde144898938136d0d2b5041
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:35:16
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4):

Podemos ver que dice

🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records

Ha podido recuperar la información de que nos quedamos en el paso 15 de 50 de la tarea con ID 1756110920

Ahora volvemos a seleccionar la opción 1 y veremos que continua desde el paso 15

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
📂 Session state loaded: 004bc91edde144898938136d0d2b5041
🆔 Reusing Persistent Client ID: 864eec3a-be1b-43
⚠️ Session ID of the server changed: 004bc91edde144898938136d0d2b5041 → 4b93c36446504a3d92467d39a8cba589
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): 864eec3a-be1b-43
📌 Session ID (server): 4b93c36446504a3d92467d39a8cba589
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:35:16
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 1
============================================================
📊 DEMO: Resumable Data Processing
============================================================
🚀 Executing resumable_data_processing (resumption: ✅)
[08/25/25 10:57:31] INFO Server log: 🔄 Resuming logging.py:40
processing from record 15
📊 resumable_data_processing: |████████████░░░░░░░░░░░░░░░░░░| 40.0% (20/50) - Processed 20/50 records [1.0s]
📊 resumable_data_processing: |███████████████░░░░░░░░░░░░░░░| 50.0% (25/50) - Processed 25/50 records [2.0s]
📊 resumable_data_processing: |██████████████████░░░░░░░░░░░░| 60.0% (30/50) - Processed 30/50 records [3.0s]
📊 resumable_data_processing: |█████████████████████░░░░░░░░░| 70.0% (35/50) - Processed 35/50 records [4.0s]
📊 resumable_data_processing: |████████████████████████░░░░░░| 80.0% (40/50) - Processed 40/50 records [5.0s]
📊 resumable_data_processing: |███████████████████████████░░░| 90.0% (45/50) - Processed 45/50 records [6.0s]
📊 resumable_data_processing: |██████████████████████████████| 100.0% (50/50) - Processed 50/50 records [7.0s]
[08/25/25 10:57:38] INFO Server log: ✅ Processing logging.py:40
completed: 55 records
✅ resumable_data_processing completed in 7.05s
📊 Result: 55 records processed
🆔 Task ID: 88e68f3bdf65cd22
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 0
🎉 Demo completed

Vemos que pone processing from record 15, es decir, ha empezado por donde se quedó

De esta manera ya podemos tener servidores y clientes MCP que procesen tareas largas y que se puedan retomar en caso de que se interrumpan.

Esto también es muy útil para cuando la tarea es muy larga, que no haga falta tener el cliente siempre ejecutándose y esperando, sino que podemos hacer que el cliente le pida al servidor la tarea y cada cierto tiempo le pida el estado de la tarea

Seguir leyendo

Stream información en MCP: Guía Completa para Actualizaciones de Progreso en Tiempo Real con FastMCP

Stream información en MCP: Guía Completa para Actualizaciones de Progreso en Tiempo Real con FastMCP

Aprende cómo implementar streaming en tiempo real en aplicaciones MCP (Model Context Protocol) usando FastMCP. Esta guía completa te muestra cómo crear servidores y clientes MCP que soportan actualizaciones de progreso e información streaming para tareas de larga duración. Construirás herramientas habilitadas para streaming que proporcionan retroalimentación en tiempo real durante el procesamiento de datos, subida de archivos, tareas de monitoreo y otras operaciones que requieren mucho tiempo. Descubre cómo usar StreamableHttpTransport, implementar manejadores de progreso con Context y crear barras de progreso visuales que mejoran la experiencia del usuario al trabajar con aplicaciones MCP que requieren retroalimentación continua.

Últimos posts -->

¿Has visto estos proyectos?

Horeca chatbot

Horeca chatbot Horeca chatbot
Python
LangChain
PostgreSQL
PGVector
React
Kubernetes
Docker
GitHub Actions

Chatbot conversacional para cocineros de hoteles y restaurantes. Un cocinero, jefe de cocina o camaeror de un hotel o restaurante puede hablar con el chatbot para obtener información de recetas y menús. Pero además implementa agentes, con los cuales puede editar o crear nuevas recetas o menús

Naviground

Naviground Naviground

Subtify

Subtify Subtify
Python
Whisper
Spaces

Generador de subtítulos para videos en el idioma que desees. Además a cada persona le pone su subtítulo de un color

Ver todos los proyectos -->

¿Quieres aplicar la IA en tu proyecto? Contactame!

¿Quieres mejorar con estos tips?

Últimos tips -->

Usa esto en local

Los espacios de Hugging Face nos permite ejecutar modelos con demos muy sencillas, pero ¿qué pasa si la demo se rompe? O si el usuario la elimina? Por ello he creado contenedores docker con algunos espacios interesantes, para poder usarlos de manera local, pase lo que pase. De hecho, es posible que si pinchas en alún botón de ver proyecto te lleve a un espacio que no funciona.

Flow edit

Flow edit Flow edit

Edita imágenes con este modelo de Flow. Basándose en SD3 o FLUX puedes editar cualquier imagen y generar nuevas

FLUX.1-RealismLora

FLUX.1-RealismLora FLUX.1-RealismLora
Ver todos los contenedores -->

¿Quieres aplicar la IA en tu proyecto? Contactame!

¿Quieres entrenar tu modelo con estos datasets?

short-jokes-dataset

Dataset de chistes en inglés

opus100

Dataset con traducciones de inglés a español

netflix_titles

Dataset con películas y series de Netflix

Ver más datasets -->