Resumable MCP: How to Build Servers and Clients with Automatic Checkpoints

Resumable MCP: How to Build Servers and Clients with Automatic Checkpoints Resumable MCP: How to Build Servers and Clients with Automatic Checkpoints

In the previous post streamable MCP, we saw how to make the MCP server send information about the process being performed so that the client can display that information to the user.

This is useful when the process performed by the MCP server is long.

But what happens if this process is interrupted? Because the server crashes, the connection is lost, etc. We would have to ask the MCP server to restart the process.

So, to prevent that from happening, we will explain how to create an MCP server and client that can continue with the process that is being carried out. That way, if it crashes for any reason, the process can be resumed from where it left off.

Resumable MCP serverlink image 15

This server is very similar to the one we made in the post streamable MCP, except that we also created checkpoints so that a process can be resumed if it is interrupted.

So let's see how to implement it.

Implement resumable MCP serverlink image 16

Create virtual environmentlink image 17

First, we create the folder where we are going to develop it.

	
!mkdir MCP_resumable_server
Copy

We create the environment with uv

	
!cd MCP_resumable_server && uv init .
Copy
	
Initialized project `mcp-resumable-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_server`

We started it

	
!cd MCP_resumable_server && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

We install the necessary libraries

	
!cd MCP_resumable_server && uv add fastmcp uvicorn
Copy
	
Resolved 64 packages in 548ms
⠙ Preparing packages... (0/1) ⠋ Preparing packages... (0/0)
⠙ Preparing packages... (0/1)-------------- 0 B/71.28 KiB
⠙ Preparing packages... (0/1)-------------- 16.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)-------------- 32.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)---------- 48.00 KiB/71.28 KiB
⠙ Preparing packages... (0/1)---------- 64.00 KiB/71.28 KiB
Prepared 1 package in 134ms
Installed 61 packages in 152ms
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.2
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.22.5
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.2.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.0
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.14.1
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Server codelink image 18

Now we write the necessary code for the resumable MCP server.

Checkpoint managerlink image 19

As we mentioned, the main difference with the server from the previous post streamable MCP is that in this one we are going to save the status at checkpoints so that we can resume the process if it is interrupted. So let's create the checkpoint manager.

	
%%writefile MCP_resumable_server/checkpoint_manager.py
#!/usr/bin/env python3
"""
Checkpoint and resumability system for MCP streaming tasks.
Allows saving the state of long tasks and resuming them from where they were interrupted.
"""
import json
import pickle
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class TaskStatus(Enum):
"""Task states."""
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskCheckpoint:
"""Represents a checkpoint of a task."""
task_id: str
session_id: str
task_name: str
parameters: Dict[str, Any]
current_step: int
total_steps: int
status: TaskStatus
created_at: datetime
updated_at: datetime
data: Dict[str, Any] # Estado específico de la tarea
error_message: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to serializable dictionary."""
data = asdict(self)
data['status'] = data['status'].value
data['created_at'] = data['created_at'].isoformat()
data['updated_at'] = data['updated_at'].isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TaskCheckpoint':
"""Create from dictionary."""
data['status'] = TaskStatus(data['status'])
data['created_at'] = datetime.fromisoformat(data['created_at'])
data['updated_at'] = datetime.fromisoformat(data['updated_at'])
return cls(**data)
class CheckpointManager:
"""Checkpoint manager for streaming tasks."""
def __init__(self, storage_dir: str = "checkpoints"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.checkpoints_file = self.storage_dir / "checkpoints.json"
self.data_dir = self.storage_dir / "data"
self.data_dir.mkdir(exist_ok=True)
self._checkpoints: Dict[str, TaskCheckpoint] = {}
self._load_checkpoints()
def _load_checkpoints(self):
"""Load checkpoints from disk."""
if self.checkpoints_file.exists():
try:
with open(self.checkpoints_file, 'r') as f:
data = json.load(f)
self._checkpoints = {
task_id: TaskCheckpoint.from_dict(checkpoint_data)
for task_id, checkpoint_data in data.items()
}
except Exception as e:
print(f"❌ Error loading checkpoints: {e}")
self._checkpoints = {}
def _save_checkpoints(self):
"""Save checkpoints to disk."""
try:
data = {
task_id: checkpoint.to_dict()
for task_id, checkpoint in self._checkpoints.items()
}
with open(self.checkpoints_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"❌ Error saving checkpoints: {e}")
def generate_task_id(self, session_id: str, task_name: str, parameters: Dict[str, Any]) -> str:
"""Generate unique ID for a task."""
# Create hash based on session, task and parameters
data = f"{session_id}-{task_name}-{json.dumps(parameters, sort_keys=True)}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def create_checkpoint(
self,
session_id: str,
task_name: str,
parameters: Dict[str, Any],
total_steps: int,
initial_data: Optional[Dict[str, Any]] = None
) -> str:
"""Create new checkpoint."""
task_id = self.generate_task_id(session_id, task_name, parameters)
checkpoint = TaskCheckpoint(
task_id=task_id,
session_id=session_id,
task_name=task_name,
parameters=parameters,
current_step=0,
total_steps=total_steps,
status=TaskStatus.PENDING,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
data=initial_data or {}
)
self._checkpoints[task_id] = checkpoint
self._save_checkpoints()
return task_id
def update_checkpoint(
self,
task_id: str,
current_step: int,
status: TaskStatus,
data: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None
) -> bool:
"""Update existing checkpoint."""
if task_id not in self._checkpoints:
return False
checkpoint = self._checkpoints[task_id]
checkpoint.current_step = current_step
checkpoint.status = status
checkpoint.updated_at = datetime.now(timezone.utc)
if data is not None:
checkpoint.data.update(data)
if error_message is not None:
checkpoint.error_message = error_message
self._save_checkpoints()
return True
def get_checkpoint(self, task_id: str) -> Optional[TaskCheckpoint]:
"""Get checkpoint by ID."""
return self._checkpoints.get(task_id)
def find_resumable_task(
self,
session_id: str,
task_name: str,
parameters: Dict[str, Any]
) -> Optional[TaskCheckpoint]:
"""Find resumable task with the same parameters."""
task_id = self.generate_task_id(session_id, task_name, parameters)
checkpoint = self._checkpoints.get(task_id)
if checkpoint and checkpoint.status in [TaskStatus.PAUSED, TaskStatus.RUNNING]:
return checkpoint
return None
def get_session_checkpoints(self, session_id: str) -> List[TaskCheckpoint]:
"""Get all checkpoints of a session."""
return [
checkpoint for checkpoint in self._checkpoints.values()
if checkpoint.session_id == session_id
]
def save_task_data(self, task_id: str, data: Any) -> bool:
"""Save specific task data."""
try:
data_file = self.data_dir / f"{task_id}.pkl"
with open(data_file, 'wb') as f:
pickle.dump(data, f)
return True
except Exception as e:
print(f"❌ Error saving task data {task_id}: {e}")
return False
def load_task_data(self, task_id: str) -> Any:
"""Load specific task data."""
try:
data_file = self.data_dir / f"{task_id}.pkl"
if data_file.exists():
with open(data_file, 'rb') as f:
return pickle.load(f)
return None
except Exception as e:
print(f"❌ Error loading task data {task_id}: {e}")
return None
def delete_checkpoint(self, task_id: str) -> bool:
"""Delete checkpoint and associated data."""
try:
# Delete checkpoint
if task_id in self._checkpoints:
del self._checkpoints[task_id]
self._save_checkpoints()
# Delete data
data_file = self.data_dir / f"{task_id}.pkl"
if data_file.exists():
data_file.unlink()
return True
except Exception as e:
print(f"❌ Error deleting checkpoint {task_id}: {e}")
return False
def cleanup_old_checkpoints(self, max_age_days: int = 7) -> int:
"""Clean up old checkpoints."""
cutoff_date = datetime.now(timezone.utc).replace(
day=datetime.now(timezone.utc).day - max_age_days
)
to_delete = []
for task_id, checkpoint in self._checkpoints.items():
if (checkpoint.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]
and checkpoint.updated_at < cutoff_date):
to_delete.append(task_id)
for task_id in to_delete:
self.delete_checkpoint(task_id)
return len(to_delete)
def get_stats(self) -> Dict[str, int]:
"""Get checkpoint statistics."""
stats = {status.value: 0 for status in TaskStatus}
for checkpoint in self._checkpoints.values():
stats[checkpoint.status.value] += 1
return stats
# Singleton global for checkpoint manager
checkpoint_manager = CheckpointManager()
Copy
	
Writing MCP_resumable_server/checkpoint_manager.py
Serverlink image 20

Now let's create the server code.

	
%%writefile MCP_resumable_server/server.py
#!/usr/bin/env python3
"""
Server MCP with resumability and checkpoints.
Demo how to implement tasks that can be paused and resumed.
"""
import asyncio
import uvicorn
import random
from typing import Dict, List, Optional, Any
from fastmcp import FastMCP, Context
from fastmcp.server.http import create_streamable_http_app
from checkpoint_manager import checkpoint_manager, TaskStatus, TaskCheckpoint
# Create instance of the MCP server
mcp = FastMCP(
name="Resumable Server",
instructions="MCP server with resumability and checkpoints for long tasks"
)
@mcp.tool
async def resumable_data_processing(
dataset_name: str = "default_dataset",
total_records: int = 100,
batch_size: int = 10,
client_id: str = None, # Add persistent client_id
context: Context = None
) -> Dict[str, Any]:
"""
Resumable data processing with automatic checkpoints.
Args:
dataset_name: Name of the dataset to process
total_records: Total records to process
batch_size: Size of the batch per iteration
client_id: Persistent client id for resumability
"""
if not context:
return {"error": "Requires context for resumability"}
# Use persistent client id if available, otherwise use session_id
persistent_id = client_id if client_id else context.session_id
if not persistent_id:
return {"error": "Requires client_id or session_id for resumability"}
task_name = "resumable_data_processing"
parameters = {
"dataset_name": dataset_name,
"total_records": total_records,
"batch_size": batch_size
}
# Search for existing task to resume - include interrupted tasks
task_id = checkpoint_manager.generate_task_id(persistent_id, task_name, parameters)
existing_checkpoint = checkpoint_manager.get_checkpoint(task_id)
if existing_checkpoint and existing_checkpoint.current_step > 0:
# Resume from checkpoint
await context.info(f"🔄 Resuming processing from record {existing_checkpoint.current_step}")
start_record = existing_checkpoint.current_step
processed_data = checkpoint_manager.load_task_data(task_id) or []
# Mark as running
checkpoint_manager.update_checkpoint(
task_id, start_record, TaskStatus.RUNNING
)
else:
# New task
await context.info(f"🆕 Starting new processing of {total_records} records")
if not existing_checkpoint:
task_id = checkpoint_manager.create_checkpoint(
persistent_id, task_name, parameters, total_records
)
start_record = 0
processed_data = []
# Mark as running
checkpoint_manager.update_checkpoint(
task_id, 0, TaskStatus.RUNNING
)
try:
# Process in batches
for record_num in range(start_record, total_records, batch_size):
batch_end = min(record_num + batch_size, total_records)
# Simulate batch processing
await asyncio.sleep(1) # Simulate work
# Process batch
batch_results = []
for i in range(record_num, batch_end):
# Simulate record processing
result = {
"record_id": f"{dataset_name}_{i:06d}",
"processed_at": f"step_{i}",
"value": random.randint(1, 1000)
}
batch_results.append(result)
processed_data.extend(batch_results)
# Save checkpoint for each batch
checkpoint_manager.update_checkpoint(
task_id,
batch_end,
TaskStatus.RUNNING,
{"last_batch": batch_results}
)
# Save processed data
checkpoint_manager.save_task_data(task_id, processed_data)
# Report progress
await context.report_progress(
progress=batch_end,
total=total_records,
message=f"Processed {batch_end}/{total_records} records"
)
await context.debug(f"Batch {record_num}-{batch_end-1} completed")
# Mark as completed
checkpoint_manager.update_checkpoint(
task_id, total_records, TaskStatus.COMPLETED
)
await context.info(f"✅ Processing completed: {len(processed_data)} records")
return {
"task_id": task_id,
"dataset_name": dataset_name,
"total_processed": len(processed_data),
"records": processed_data[:5], # Show first 5
"status": "completed"
}
except Exception as e:
# Mark as failed
checkpoint_manager.update_checkpoint(
task_id, record_num, TaskStatus.FAILED, error_message=str(e)
)
await context.error(f"❌ Error in processing: {e}")
raise
@mcp.tool
async def pause_task(
task_id: str,
context: Context = None
) -> Dict[str, str]:
"""
Pause a task in execution.
Args:
task_id: ID of the task to pause
"""
checkpoint = checkpoint_manager.get_checkpoint(task_id)
if not checkpoint:
return {"error": f"Task {task_id} not found"}
if checkpoint.status != TaskStatus.RUNNING:
return {"error": f"Task {task_id} is not running"}
success = checkpoint_manager.update_checkpoint(
task_id, checkpoint.current_step, TaskStatus.PAUSED
)
if success:
if context:
await context.info(f"⏸️ Task {task_id} paused")
return {"message": f"Task {task_id} paused correctly"}
else:
return {"error": f"Could not pause task {task_id}"}
@mcp.tool
async def list_session_tasks(
client_id: str = None, # Add persistent client_id
context: Context = None
) -> Dict[str, Any]:
"""
List all tasks of the current session or the specified client.
Args:
client_id: Persistent client id (optional)
"""
if not context:
return {"error": "Requires context"}
# Use persistent client id if available, otherwise use session_id
persistent_id = client_id if client_id else context.session_id
if not persistent_id:
return {"error": "Requires client_id or session_id"}
checkpoints = checkpoint_manager.get_session_checkpoints(persistent_id)
tasks = []
for checkpoint in checkpoints:
task_info = {
"task_id": checkpoint.task_id,
"task_name": checkpoint.task_name,
"status": checkpoint.status.value,
"progress": f"{checkpoint.current_step}/{checkpoint.total_steps}",
"progress_percent": round((checkpoint.current_step / checkpoint.total_steps) * 100, 1) if checkpoint.total_steps > 0 else 0,
"created_at": checkpoint.created_at.isoformat(),
"updated_at": checkpoint.updated_at.isoformat()
}
tasks.append(task_info)
return {
"session_id": context.session_id,
"persistent_id": persistent_id,
"total_tasks": len(tasks),
"tasks": tasks
}
@mcp.tool
async def get_checkpoint_stats(context: Context = None) -> Dict[str, Any]:
"""
Get global checkpoint statistics.
"""
stats = checkpoint_manager.get_stats()
return {
"checkpoint_stats": stats,
"total_checkpoints": sum(stats.values())
}
@mcp.tool
async def get_session_info(context: Context = None) -> Dict[str, Any]:
"""
Get information about the current session.
"""
if not context:
return {"error": "No context available"}
return {
"session_id": context.session_id,
"request_id": context.request_id,
"client_id": context.client_id
}
async def run_resumable_server(host: str = "127.0.0.1", port: int = 8001):
"""Run server with resumability capabilities."""
print(f"🚀 Starting MCP server with resumability at {host}:{port}")
# Create application
app = create_streamable_http_app(
server=mcp,
streamable_http_path="/mcp/",
stateless_http=False, # IMPORTANT: stateful for sessions
debug=True
)
config = uvicorn.Config(
app=app,
host=host,
port=port,
log_level="info",
access_log=False
)
server = uvicorn.Server(config)
print(f"✅ Resumable server ready at http://{host}:{port}/mcp/")
print("🔧 Tools with resumability:")
print(" - resumable_data_processing: Processing with checkpoints")
# print(" - large_file_download: Download resumable")
# print(" - machine_learning_training: ML training resumable")
print(" - pause_task: Pause tasks")
print(" - list_session_tasks: List session tasks")
print(" - get_checkpoint_stats: Checkpoint statistics")
print(" - get_session_info: Current session information")
await server.serve()
if __name__ == "__main__":
try:
asyncio.run(run_resumable_server())
except KeyboardInterrupt:
print(" ⏹️ Server stopped by user")
except Exception as e:
print(f"❌ Error running server: {e}")
Copy
	
Writing MCP_resumable_server/server.py

Resumable MCP clientlink image 21

Here we will also have a client very similar to the one in the previous post streamable MCP, but we will add a json with the information of the sessions created. So that if a session is interrupted in the middle of a process, it can be continued from where it left off.

Implement resumable MCP clientlink image 22

Create the virtual environment for the customerlink image 23

First, we create the folder where we are going to develop it.

	
!mkdir MCP_resumable_client
Copy

We create the environment with uv

	
!cd MCP_resumable_client && uv init .
Copy
	
Initialized project `mcp-resumable-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_resumable_client`

We started it

	
!cd MCP_resumable_client && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

We install the necessary libraries

	
!cd MCP_resumable_client && uv add fastmcp
Copy
	
Resolved 64 packages in 314ms
Installed 61 packages in 145ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.2
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.22.5
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.2.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.0
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.14.1
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Client codelink image 24

Now let's create the MCP client. It will be very similar to the one in the previous post (this is the last time I'll say this, I promise) streamable MCP, except that we will also save the sessions in a json file so that we can recover any that were left incomplete in a server process.

	
%%writefile MCP_resumable_client/client.py
#!/usr/bin/env python3
"""
Client MCP with resumability and session persistence capabilities.
Demonstrates how to connect to a resumable server and handle interruptions.
"""
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
@dataclass
class TaskProgress:
"""Represents the progress of a task."""
task_id: str
task_name: str
current_step: int
total_steps: int
last_message: str
started_at: datetime = field(default_factory=datetime.now)
last_update: datetime = field(default_factory=datetime.now)
@property
def progress_percent(self) -> float:
"""Calculate progress percentage."""
if self.total_steps == 0:
return 0.0
return (self.current_step / self.total_steps) * 100
@dataclass
class SessionState:
"""Session state of the client."""
client_id: str # Persistent client ID
session_id: str # Session ID of the server (can change)
server_url: str
active_tasks: Dict[str, TaskProgress]
completed_tasks: List[str]
created_at: datetime = field(default_factory=datetime.now)
class ResumableProgressHandler:
"""Handles progress with resumability capabilities."""
def __init__(self, task_name: str, session_state: SessionState):
self.task_name = task_name
self.session_state = session_state
self.start_time = time.time()
self.last_progress = 0
self.task_progress: Optional[TaskProgress] = None
def __call__(self, progress: float, total: float, message: str):
"""Callback for progress updates."""
# Create or update task progress
if not self.task_progress:
# Try to find task_id from the message
task_id = self._extract_task_id(message)
if not task_id:
task_id = f"task_{int(time.time())}"
self.task_progress = TaskProgress(
task_id=task_id,
task_name=self.task_name,
current_step=int(progress),
total_steps=int(total),
last_message=message
)
self.session_state.active_tasks[task_id] = self.task_progress
else:
self.task_progress.current_step = int(progress)
self.task_progress.total_steps = int(total)
self.task_progress.last_message = message
self.task_progress.last_update = datetime.now()
# Display progress visually
self._display_progress(progress, total, message)
# Save session state
self._save_session_state()
def _extract_task_id(self, message: str) -> Optional[str]:
"""Try to extract task_id from the message."""
# Search for patterns like "task_id: abc123" in the message
import re
match = re.search(r'task[_s]*id[:s]*([a-f0-9]{16})', message.lower())
return match.group(1) if match else None
def _display_progress(self, progress: float, total: float, message: str):
"""Display progress visually."""
percentage = (progress / total) * 100 if total > 0 else 0
bar_length = 30
filled_length = int(bar_length * percentage / 100)
bar = '█' * filled_length + '░' * (bar_length - filled_length)
elapsed = time.time() - self.start_time
print(f" 📊 {self.task_name}: |{bar}| {percentage:.1f}% "
f"({progress:.0f}/{total:.0f}) - {message} [{elapsed:.1f}s]",
end='', flush=True)
if progress >= total:
print() # New line when completed
if self.task_progress:
# Move to completed
task_id = self.task_progress.task_id
self.session_state.completed_tasks.append(task_id)
if task_id in self.session_state.active_tasks:
del self.session_state.active_tasks[task_id]
self._save_session_state()
def _save_session_state(self):
"""Save session state."""
# In a real case, this would be saved to disk
pass
class MCPResumableClient:
"""Client MCP with resumability capabilities."""
def __init__(self, server_url: str = "http://localhost:8001/mcp/"):
self.server_url = server_url
self.transport = None
self.client = None
self.session_state: Optional[SessionState] = None
self.state_file = Path("session_state.json")
async def __aenter__(self):
"""Initialize connection and load state."""
# Load previous state if it exists
self._load_session_state()
self.transport = StreamableHttpTransport(
url=self.server_url,
sse_read_timeout=120.0 # Timeout for resumable tasks
)
self.client = Client(transport=self.transport)
await self.client.__aenter__()
# Create new session state if it doesn't exist
if not self.session_state:
# Generate unique persistent client ID
client_id = str(uuid.uuid4())[:16] # Use only the first 16 characters
# Get real session_id from the server
try:
session_info_result = await self.client.call_tool("get_session_info", {})
# Extract content from CallToolResult
if hasattr(session_info_result, 'content') and session_info_result.content:
session_info_text = session_info_result.content[0].text
if isinstance(session_info_text, str):
session_info = json.loads(session_info_text)
else:
session_info = session_info_text
else:
session_info = {}
real_session_id = session_info.get("session_id", f"session_{int(time.time())}")
print("🏗️ Creating new session")
print(f"🔗 Session ID of the server: {real_session_id}")
print(f"🆔 Persistent Client ID: {client_id}")
except Exception as e:
print(f"⚠️ Could not get session_id from the server: {e}")
real_session_id = f"session_{int(time.time())}"
self.session_state = SessionState(
client_id=client_id,
session_id=real_session_id,
server_url=self.server_url,
active_tasks={},
completed_tasks=[]
)
self._save_session_state()
else:
# Reuse existing client_id
print(f"🆔 Reusing Persistent Client ID: {self.session_state.client_id}")
# Verify if the session_id matches the server
try:
session_info_result = await self.client.call_tool("get_session_info", {})
# Extract content from CallToolResult
if hasattr(session_info_result, 'content') and session_info_result.content:
session_info_text = session_info_result.content[0].text
if isinstance(session_info_text, str):
session_info = json.loads(session_info_text)
else:
session_info = session_info_text
else:
session_info = {}
server_session_id = session_info.get("session_id")
if server_session_id and server_session_id != self.session_state.session_id:
print(f"⚠️ Session ID of the server changed: {self.session_state.session_id}{server_session_id}")
self.session_state.session_id = server_session_id
self._save_session_state()
except Exception as e:
print(f"⚠️ Could not verify session_id: {e}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close connection and save state."""
if self.client:
await self.client.__aexit__(exc_type, exc_val, exc_tb)
self._save_session_state()
def _load_session_state(self):
"""Load session state from disk."""
if self.state_file.exists():
try:
with open(self.state_file, 'r') as f:
data = json.load(f)
# Reconstruct objects
active_tasks = {}
for task_id, task_data in data.get('active_tasks', {}).items():
task_data['started_at'] = datetime.fromisoformat(task_data['started_at'])
task_data['last_update'] = datetime.fromisoformat(task_data['last_update'])
active_tasks[task_id] = TaskProgress(**task_data)
self.session_state = SessionState(
client_id=data.get('client_id', str(uuid.uuid4())[:16]), # Generate if not exists
session_id=data['session_id'],
server_url=data['server_url'],
active_tasks=active_tasks,
completed_tasks=data.get('completed_tasks', []),
created_at=datetime.fromisoformat(data['created_at'])
)
print(f"📂 Session state loaded: {self.session_state.session_id}")
except Exception as e:
print(f"⚠️ Error loading session state: {e}")
self.session_state = None
else:
print("💬 No session state found, starting new session")
self.session_state = None
def _save_session_state(self):
"""Save session state to disk."""
if not self.session_state:
return
try:
# Convert to serializable format
active_tasks = {}
for task_id, task_progress in self.session_state.active_tasks.items():
active_tasks[task_id] = {
'task_id': task_progress.task_id,
'task_name': task_progress.task_name,
'current_step': task_progress.current_step,
'total_steps': task_progress.total_steps,
'last_message': task_progress.last_message,
'started_at': task_progress.started_at.isoformat(),
'last_update': task_progress.last_update.isoformat()
}
data = {
'client_id': self.session_state.client_id,
'session_id': self.session_state.session_id,
'server_url': self.session_state.server_url,
'active_tasks': active_tasks,
'completed_tasks': self.session_state.completed_tasks,
'created_at': self.session_state.created_at.isoformat()
}
with open(self.state_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"⚠️ Error saving session state: {e}")
async def test_connection(self) -> bool:
"""Test connection to the server."""
try:
await self.client.ping()
print(f"✅ Connection established with the resumable server")
return True
except Exception as e:
print(f"❌ Connection error: {e}")
return False
async def call_resumable_tool(
self,
tool_name: str,
parameters: Dict[str, Any],
allow_resume: bool = True
) -> Dict[str, Any]:
"""Call tool with resumability capabilities."""
progress_handler = ResumableProgressHandler(tool_name, self.session_state)
start_time = time.time()
# Add client_id to the parameters if allow_resume is enabled
if allow_resume and self.session_state:
parameters = {**parameters, 'client_id': self.session_state.client_id}
try:
print(f"🚀 Executing {tool_name} (resumption: {'✅' if allow_resume else '❌'})")
result = await self.client.call_tool(
tool_name,
parameters,
progress_handler=progress_handler
)
duration = time.time() - start_time
print(f"✅ {tool_name} completed in {duration:.2f}s")
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except KeyboardInterrupt:
print(f" ⏸️ {tool_name} interrupted by the user")
# In a real implementation, here we would pause the task
raise
except Exception as e:
duration = time.time() - start_time
print(f"❌ {tool_name} failed after {duration:.2f}s: {e}")
raise
async def list_session_tasks(self) -> Dict[str, Any]:
"""List session tasks."""
try:
parameters = {}
if self.session_state:
parameters['client_id'] = self.session_state.client_id
result = await self.client.call_tool("list_session_tasks", parameters)
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error listing session tasks: {e}")
return {"error": str(e)}
async def pause_task(self, task_id: str) -> Dict[str, Any]:
"""Pause a specific task."""
try:
result = await self.client.call_tool("pause_task", {"task_id": task_id})
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error pausing task {task_id}: {e}")
return {"error": str(e)}
async def get_checkpoint_stats(self) -> Dict[str, Any]:
"""Get checkpoint statistics."""
try:
result = await self.client.call_tool("get_checkpoint_stats", {})
# Extract content from CallToolResult
if hasattr(result, 'content') and result.content:
if isinstance(result.content[0].text, str):
try:
return json.loads(result.content[0].text)
except json.JSONDecodeError:
return {"text": result.content[0].text}
else:
return result.content[0].text
else:
return {"error": "No content in result"}
except Exception as e:
print(f"❌ Error getting checkpoint statistics: {e}")
return {"error": str(e)}
def show_session_summary(self):
"""Show session summary."""
if not self.session_state:
print("❌ No session state available")
return
print(" " + "="*60)
print("📋 SESSION SUMMARY")
print("="*60)
print(f"🆔 Client ID (persistent): {self.session_state.client_id}")
print(f"📌 Session ID (server): {self.session_state.session_id}")
print(f"🔗 Server: {self.session_state.server_url}")
print(f"📅 Created: {self.session_state.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" 🔄 Active Tasks: {len(self.session_state.active_tasks)}")
for task_id, task in self.session_state.active_tasks.items():
progress = task.progress_percent
print(f" • {task.task_name} ({task_id[:8]}...): {progress:.1f}%")
print(f" └─ {task.last_message}")
print(f" ✅ Completed Tasks: {len(self.session_state.completed_tasks)}")
for task_id in self.session_state.completed_tasks[-5:]: # Last 5
print(f" • {task_id[:8]}...")
async def demo_resumable_data_processing(client: MCPResumableClient):
"""Demo of resumable data processing."""
print(" " + "="*60)
print("📊 DEMO: Resumable Data Processing")
print("="*60)
result = await client.call_resumable_tool(
"resumable_data_processing",
{
"dataset_name": "customer_data",
"total_records": 50,
"batch_size": 5
}
)
if "error" not in result:
print(f"📊 Result: {result.get('total_processed', 0)} records processed")
print(f"🆔 Task ID: {result.get('task_id', 'N/A')}")
async def interactive_demo(client: MCPResumableClient):
"""Interactive demo with options."""
while True:
print(" " + "="*60)
print("🎮 INTERACTIVE DEMO - Resumable Client")
print("="*60)
print("1. Resumable data processing")
print("2. List session tasks")
print("3. Checkpoint statistics")
print("4. Session summary")
print("0. Exit")
print("-" * 60)
try:
choice = input("Select an option (0-4): ").strip()
if choice == "0":
break
elif choice == "1":
await demo_resumable_data_processing(client)
elif choice == "2":
tasks = await client.list_session_tasks()
print(f"📋 Session tasks: {json.dumps(tasks, indent=2)}")
elif choice == "3":
stats = await client.get_checkpoint_stats()
print(f"📊 Checkpoint statistics: {json.dumps(stats, indent=2)}")
elif choice == "4":
client.show_session_summary()
else:
print("❌ Invalid option")
except KeyboardInterrupt:
print(" ⏸️ Demo interrupted")
break
except Exception as e:
print(f"❌ Error: {e}")
async def run_resumable_demo():
"""Run resumable demo."""
print("🌟 Resumable Client MCP - Demo")
print("="*60)
try:
async with MCPResumableClient() as client:
# Test connection
if not await client.test_connection():
print("❌ Could not connect to the resumable server.")
print(" Make sure the server is running on port 8001")
return
# Show session state
client.show_session_summary()
# Run interactive demo
await interactive_demo(client)
print(" 🎉 Demo completed")
except Exception as e:
print(f"❌ Error in the demo: {e}")
if __name__ == "__main__":
try:
asyncio.run(run_resumable_demo())
except KeyboardInterrupt:
print(" ⏹️ Demo interrupted by the user")
except Exception as e:
print(f"❌ Error running demo: {e}")
Copy
	
Overwriting MCP_resumable_client/client.py

Executionlink image 25

Let's start the server, launch the client, request a task from the server, stop the client and server to simulate a lost connection, and then restart the server and client to resume the task where we left off.

First execution - process interruptedlink image 26

Before executing anything, let's look at the contents of the server and client folders.

Let's see what's in the server folder.

	
!cd MCP_resumable_server && ls -lha
Copy
	
total 336
drwxr-xr-x@ 11 macm1 staff 352B Aug 25 10:26 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.md
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__
-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py
-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml
-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock

We have the files .python-version, .venv, README.md, __pycache__, main.py, pyproject.toml, and uv.lock, which are the files created by uv when creating the virtual environment. And we have the files checkpoint_manager.py and server.py, which are the files we created for the resumable MCP server.

Now let's see what files are in the client folder.

	
!cd MCP_resumable_client && ls -lha
Copy
	
total 336
drwxr-xr-x@ 9 macm1 staff 288B Aug 25 10:26 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md
-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py
-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock

As before, we have the files .python-version, .venv, README.md, main.py, pyproject.toml, and uv.lock, which are the files created by uv when creating the virtual environment. And we have the file client.py, which is the file we created for the resumable MCP client.

Once we have seen this, we start the server.

	
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
Copy
	
🚀 Starting MCP server with resumability at 127.0.0.1:8001
✅ Resumable server ready at http://127.0.0.1:8001/mcp/
🔧 Tools with resumability:
- resumable_data_processing: Processing with checkpoints
- pause_task: Pause tasks
- list_session_tasks: List session tasks
- get_checkpoint_stats: Checkpoint statistics
- get_session_info: Current session information
INFO: Started server process [47049]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)

Now we run the client, select option 1 (Resumable data processing), and stop it halfway through.

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
💬 No session state found, starting new session
🏗️ Creating new session
🔗 Session ID of the server: 6bf64139282e41e0a2233ec92647bf02
🆔 Persistent Client ID: f0af843a-9b0e-42
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): f0af843a-9b0e-42
📌 Session ID (server): 6bf64139282e41e0a2233ec92647bf02
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:34:53
🔄 Active Tasks: 0
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 1
============================================================
📊 DEMO: Resumable Data Processing
============================================================
🚀 Executing resumable_data_processing (resumption: ✅)
[08/25/25 10:35:19] INFO Server log: logging.py:40
🆕 Starting new processing of 50 records
📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C
⏹️ Demo interrupted by the user

We also stopped the server.

Now that we have stopped the server and client, let's take another look at what's in each folder.

First, let's see what's in the server folder.

	
!cd MCP_resumable_server && ls -lha
Copy
	
total 336
drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:37 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:38 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:37 README.md
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 09:42 __pycache__
-rw-r--r--@ 1 macm1 staff 8.4K Aug 25 09:40 checkpoint_manager.py
drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 checkpoints
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:37 main.py
-rw-r--r--@ 1 macm1 staff 213B Aug 25 09:38 pyproject.toml
-rw-r--r--@ 1 macm1 staff 9.2K Aug 25 09:42 server.py
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:38 uv.lock

We can see that in addition to everything that was there before, there is now also a folder called checkpoints that contains the checkpoints for the tasks that have been executed.

	
!cd MCP_resumable_server/checkpoints && ls -lha
Copy
	
total 8
drwxr-xr-x@ 4 macm1 staff 128B Aug 25 10:35 .
drwxr-xr-x@ 12 macm1 staff 384B Aug 25 10:33 ..
-rw-r--r--@ 1 macm1 staff 1.1K Aug 25 10:35 checkpoints.json
drwxr-xr-x@ 3 macm1 staff 96B Aug 25 10:35 data

If we look at the contents of the json, we can see

  "current_step": 15,
"total_steps": 50,
	
!cd MCP_resumable_server/checkpoints && cat checkpoints.json
Copy
	
{
"9d7bc504c50e0bce": {
"task_id": "9d7bc504c50e0bce",
"session_id": "864eec3a-be1b-43",
"task_name": "resumable_data_processing",
"parameters": {
"dataset_name": "customer_data",
"total_records": 50,
"batch_size": 5
},
"current_step": 15,
"total_steps": 50,
"status": "failed",
"created_at": "2025-08-25T08:35:19.349142+00:00",
"updated_at": "2025-08-25T08:35:23.366672+00:00",
"data": {
"last_batch": [
{
"record_id": "customer_data_000015",
"processed_at": "step_15",
"value": 88
},
{
"record_id": "customer_data_000016",
"processed_at": "step_16",
"value": 17
},
{
"record_id": "customer_data_000017",
"processed_at": "step_17",
"value": 596
},
{
"record_id": "customer_data_000018",
"processed_at": "step_18",
"value": 693
},
{
"record_id": "customer_data_000019",
"processed_at": "step_19",
"value": 34
}
]
},
"error_message": ""
}
}

In other words, we are stuck on step 15 of 50, which, if you look at the customer's exit, is where we interrupted the process.

📊 resumable_data_processing: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 10.0% (5/50) - Processed 5/50 records [1.0s]
📊 resumable_data_processing: |██████░░░░░░░░░░░░░░░░░░░░░░░░| 20.0% (10/50) - Processed 10/50 records [2.0s]
📊 resumable_data_processing: |█████████░░░░░░░░░░░░░░░░░░░░░| 30.0% (15/50) - Processed 15/50 records [3.0s]
^C

Now we see the contents of the client folder, we can see that there is now a file called session_state.json

	
!cd MCP_resumable_client && ls -lha
Copy
	
total 344
drwxr-xr-x@ 10 macm1 staff 320B Aug 25 10:35 .
drwxr-xr-x 112 macm1 staff 3.5K Aug 25 09:56 ..
-rw-r--r--@ 1 macm1 staff 5B Aug 25 09:56 .python-version
drwxr-xr-x@ 8 macm1 staff 256B Aug 25 09:57 .venv
-rw-r--r--@ 1 macm1 staff 0B Aug 25 09:56 README.md
-rw-r--r--@ 1 macm1 staff 20K Aug 25 10:24 client.py
-rw-r--r--@ 1 macm1 staff 98B Aug 25 09:56 main.py
-rw-r--r--@ 1 macm1 staff 190B Aug 25 09:57 pyproject.toml
-rw-r--r--@ 1 macm1 staff 546B Aug 25 10:35 session_state.json
-rw-r--r--@ 1 macm1 staff 128K Aug 25 09:57 uv.lock

Let's see what's inside.

	
!cd MCP_resumable_client && cat session_state.json
Copy
	
{
"client_id": "864eec3a-be1b-43",
"session_id": "d34ff655d6464fb4ae5eeb64c62989b0",
"server_url": "http://localhost:8001/mcp/",
"active_tasks": {
"task_1756110920": {
"task_id": "task_1756110920",
"task_name": "resumable_data_processing",
"current_step": 15,
"total_steps": 50,
"last_message": "Processed 15/50 records",
"started_at": "2025-08-25T10:35:20.353521",
"last_update": "2025-08-25T10:35:22.364507"
}
},
"completed_tasks": [],
"created_at": "2025-08-25T10:35:16.230926"
}

As in the server's json, we see

   "current_step": 15,
"total_steps": 50,

It has also been noted that we have remained at step 15 of 50, but the most important thing is

      "task_id": "task_1756110920",

Where we store the task ID so we can ask the server to resume it.

Second execution - process resumedlink image 27

If we now restart the server

	
!cd MCP_resumable_server && source .venv/bin/activate && uv run server.py
Copy
	
🚀 Starting MCP server with resumability at 127.0.0.1:8001
✅ Resumable server ready at http://127.0.0.1:8001/mcp/
🔧 Tools with resumability:
- resumable_data_processing: Processing with checkpoints
- pause_task: Pause tasks
- list_session_tasks: List session tasks
- get_checkpoint_stats: Checkpoint statistics
- get_session_info: Current session information
INFO: Started server process [52338]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)

And we run the client again

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
📂 Session state loaded: d34ff655d6464fb4ae5eeb64c62989b0
🆔 Reusing Persistent Client ID: 864eec3a-be1b-43
⚠️ Session ID of the server changed: d34ff655d6464fb4ae5eeb64c62989b0 → 004bc91edde144898938136d0d2b5041
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): 864eec3a-be1b-43
📌 Session ID (server): 004bc91edde144898938136d0d2b5041
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:35:16
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4):

We can see that it says

🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records

You have been able to recover the information that we left at step 15 of 50 of the task with ID 1756110920.

Now select option 1 again and you will see that it continues from step 15.

	
!cd MCP_resumable_client && source .venv/bin/activate && uv run client.py
Copy
	
🌟 Resumable Client MCP - Demo
============================================================
📂 Session state loaded: 004bc91edde144898938136d0d2b5041
🆔 Reusing Persistent Client ID: 864eec3a-be1b-43
⚠️ Session ID of the server changed: 004bc91edde144898938136d0d2b5041 → 4b93c36446504a3d92467d39a8cba589
✅ Connection established with the resumable server
============================================================
📋 SESSION SUMMARY
============================================================
🆔 Client ID (persistent): 864eec3a-be1b-43
📌 Session ID (server): 4b93c36446504a3d92467d39a8cba589
🔗 Server: http://localhost:8001/mcp/
📅 Created: 2025-08-25 10:35:16
🔄 Active Tasks: 1
• resumable_data_processing (task_175...): 30.0%
└─ Processed 15/50 records
✅ Completed Tasks: 0
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 1
============================================================
📊 DEMO: Resumable Data Processing
============================================================
🚀 Executing resumable_data_processing (resumption: ✅)
[08/25/25 10:57:31] INFO Server log: 🔄 Resuming logging.py:40
processing from record 15
📊 resumable_data_processing: |████████████░░░░░░░░░░░░░░░░░░| 40.0% (20/50) - Processed 20/50 records [1.0s]
📊 resumable_data_processing: |███████████████░░░░░░░░░░░░░░░| 50.0% (25/50) - Processed 25/50 records [2.0s]
📊 resumable_data_processing: |██████████████████░░░░░░░░░░░░| 60.0% (30/50) - Processed 30/50 records [3.0s]
📊 resumable_data_processing: |█████████████████████░░░░░░░░░| 70.0% (35/50) - Processed 35/50 records [4.0s]
📊 resumable_data_processing: |████████████████████████░░░░░░| 80.0% (40/50) - Processed 40/50 records [5.0s]
📊 resumable_data_processing: |███████████████████████████░░░| 90.0% (45/50) - Processed 45/50 records [6.0s]
📊 resumable_data_processing: |██████████████████████████████| 100.0% (50/50) - Processed 50/50 records [7.0s]
[08/25/25 10:57:38] INFO Server log: ✅ Processing logging.py:40
completed: 55 records
✅ resumable_data_processing completed in 7.05s
📊 Result: 55 records processed
🆔 Task ID: 88e68f3bdf65cd22
============================================================
🎮 INTERACTIVE DEMO - Resumable Client
============================================================
1. Resumable data processing
2. List session tasks
3. Checkpoint statistics
4. Session summary
0. Exit
------------------------------------------------------------
Select an option (0-4): 0
🎉 Demo completed

We see that it says “processing from record 15,” meaning that it has started where it left off.

This way, we can now have MCP servers and clients that process long tasks and can be resumed in case of interruption.

This is also very useful when the task is very long, so that the client does not have to be running and waiting all the time. Instead, we can have the client request the task from the server and periodically request the status of the task.

Continue reading

Stream Information in MCP: Complete Guide to Real-time Progress Updates with FastMCP

Stream Information in MCP: Complete Guide to Real-time Progress Updates with FastMCP

Learn how to implement real-time streaming in MCP (Model Context Protocol) applications using FastMCP. This comprehensive guide shows you how to create MCP servers and clients that support progress updates and streaming information for long-running tasks. You'll build streaming-enabled tools that provide real-time feedback during data processing, file uploads, monitoring tasks, and other time-intensive operations. Discover how to use StreamableHttpTransport, implement progress handlers with Context, and create visual progress bars that enhance user experience when working with MCP applications that require continuous feedback.

Last posts -->

Have you seen these projects?

Horeca chatbot

Horeca chatbot Horeca chatbot
Python
LangChain
PostgreSQL
PGVector
React
Kubernetes
Docker
GitHub Actions

Chatbot conversational for cooks of hotels and restaurants. A cook, kitchen manager or room service of a hotel or restaurant can talk to the chatbot to get information about recipes and menus. But it also implements agents, with which it can edit or create new recipes or menus

Subtify

Subtify Subtify
Python
Whisper
Spaces

Subtitle generator for videos in the language you want. Also, it puts a different color subtitle to each person

View all projects -->

Do you want to apply AI in your project? Contact me!

Do you want to improve with these tips?

Last tips -->

Use this locally

Hugging Face spaces allow us to run models with very simple demos, but what if the demo breaks? Or if the user deletes it? That's why I've created docker containers with some interesting spaces, to be able to use them locally, whatever happens. In fact, if you click on any project view button, it may take you to a space that doesn't work.

Flow edit

Flow edit Flow edit

FLUX.1-RealismLora

FLUX.1-RealismLora FLUX.1-RealismLora
View all containers -->

Do you want to apply AI in your project? Contact me!

Do you want to train your model with these datasets?

short-jokes-dataset

Dataset with jokes in English

opus100

Dataset with translations from English to Spanish

netflix_titles

Dataset with Netflix movies and series

View more datasets -->