MCP Durabilidade: Servidor e Cliente com Persistência para Tarefas de Longa Duração

MCP Durabilidade: Servidor e Cliente com Persistência para Tarefas de Longa Duração MCP Durabilidade: Servidor e Cliente com Persistência para Tarefas de Longa Duração

Nesta publicação, veremos como criar um servidor e um cliente MCP com a capacidade de o cliente solicitar uma tarefa de longa duração ao servidor, encerrar o cliente, a tarefa continuar sendo executada no servidor, reconectar o cliente e ver o status da tarefa.

Servidor MCP com durabilidadelink image 21

O servidor será semelhante ao que criamos nas publicações streamable MCP e resumable MCP, com a diferença de que agora vamos adicionar a capacidade de o cliente solicitar uma tarefa de longa duração ao servidor, parar o cliente, a tarefa continuar sendo executada no servidor, reconectar o cliente e ver o status da tarefa.

Além disso, neste caso, vamos guardar as tarefas em um banco de dados SQLite para torná-lo mais profissional, já que nos posts anteriores guardávamos as informações em arquivos json.

Implementación del servidor MCP con durabilidadlink image 22

O servidor terá uma classe DurableTaskManager que irá gerenciar a criação das tarefas, armazená-las no banco de dados, relatar o status e poderá cancelá-las.

Você terá três recursos: para obter o status de uma tarefa, para listar todas as tarefas e para listar as tarefas por status.

E terá cinco ferramentas, três serão tarefas específicas (migração de dados, processamento de dados e treinamento de modelo), uma para cancelar uma tarefa e a última para obter as estatísticas do servidor.

Criar ambiente virtual do servidorlink image 23

Primeiro criamos a pasta onde vamos desenvolvê-lo.

	
!mkdir MCP_durability_server
Copy

Criamos o ambiente com uv

	
!cd MCP_durability_server && uv init .
Copy
	
Initialized project `mcp-durability-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_server`

Iniciamos o ambiente

	
!cd MCP_durability_server && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos as bibliotecas necessárias

	
!cd MCP_durability_server && uv add fastmcp
Copy
	
Resolved 64 packages in 344ms
Installed 61 packages in 136ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.3
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.23.0
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.3.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.1
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.15.0
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código do servidorlink image 24

	
%%writefile MCP_durability_server/server.py
"""
MCP Durability Server
MPC server that implements durability for long-running agents using Resource links.
Allows long-running operations to survive server restarts and provides state tracking
outside of band.
Implemented pattern:
1. Tools return resource links immediately
2. Background processing continues asynchronously
3. Clients can poll or subscribe to resources for state updates
Usage:
python server.py
"""
import asyncio
import json
import sqlite3
import time
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
from fastmcp import FastMCP
from fastmcp.server.context import Context
class TaskStatus(Enum):
"""Long running task status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskResult:
"""Result data for a completed task."""
task_id: str
status: TaskStatus
progress: float = 0.0
total: Optional[float] = None
message: Optional[str] = None
result_data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: float = 0.0
updated_at: float = 0.0
completed_at: Optional[float] = None
class DurableTaskManager:
"""Manages persistent task state and background execution."""
def __init__(self, db_path: str = "mcp_tasks.db"):
self.db_path = db_path
self._background_tasks: Dict[str, asyncio.Task] = {}
self._setup_database()
def _setup_database(self) -> None:
"""Initializes SQLite database for task persistence."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
progress REAL DEFAULT 0.0,
total REAL,
message TEXT,
result_data TEXT,
error TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
completed_at REAL
)
""")
conn.commit()
conn.close()
async def create_task(self, task_id: Optional[str] = None) -> str:
"""Creates a new task and returns its ID."""
if not task_id:
task_id = str(uuid.uuid4())
current_time = time.time()
task_result = TaskResult(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=current_time,
updated_at=current_time
)
await self._save_task(task_result)
return task_id
async def start_background_task(
self,
task_id: str,
task_function,
context: Optional[Context] = None
) -> None:
"""Starts a background task and tracks its execution."""
async def wrapper():
try:
await self._update_task_status(task_id, TaskStatus.RUNNING)
# Execute the real task
if context:
result = await task_function(task_id, context, self)
else:
result = await task_function(task_id, self)
# Mark as completed with results
await self._update_task_completion(task_id, result)
except asyncio.CancelledError:
await self._update_task_status(task_id, TaskStatus.CANCELLED)
raise
except Exception as e:
await self._update_task_error(task_id, str(e))
finally:
# Clean up background task reference
self._background_tasks.pop(task_id, None)
# Start the task in the background
task = asyncio.create_task(wrapper())
self._background_tasks[task_id] = task
async def _save_task(self, task_result: TaskResult) -> None:
"""Saves the task result to the database."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
INSERT OR REPLACE INTO tasks
(task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task_result.task_id,
task_result.status.value,
task_result.progress,
task_result.total,
task_result.message,
json.dumps(task_result.result_data) if task_result.result_data else None,
task_result.error,
task_result.created_at,
task_result.updated_at,
task_result.completed_at
))
conn.commit()
conn.close()
async def get_task(self, task_id: str) -> Optional[TaskResult]:
"""Retrieves the task result from the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks WHERE task_id = ?
""", (task_id,))
row = cursor.fetchone()
conn.close()
if not row:
return None
result_data = None
if row[5]: # result_data
try:
result_data = json.loads(row[5])
except json.JSONDecodeError:
pass
return TaskResult(
task_id=row[0],
status=TaskStatus(row[1]),
progress=row[2],
total=row[3],
message=row[4],
result_data=result_data,
error=row[6],
created_at=row[7],
updated_at=row[8],
completed_at=row[9]
)
async def update_task_progress(
self,
task_id: str,
progress: float,
total: Optional[float] = None,
message: Optional[str] = None
) -> None:
"""Updates the task progress."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.progress = progress
if total is not None:
task_result.total = total
if message is not None:
task_result.message = message
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_status(self, task_id: str, status: TaskStatus) -> None:
"""Updates the task status."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = status
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_completion(self, task_id: str, result_data: Any) -> None:
"""Marks the task as completed with results."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = TaskStatus.COMPLETED
task_result.progress = task_result.total or 100.0
task_result.result_data = result_data if isinstance(result_data, dict) else {"result": result_data}
task_result.completed_at = time.time()
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_error(self, task_id: str, error: str) -> None:
"""Marks the task as failed with error."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = TaskStatus.FAILED
task_result.error = error
task_result.updated_at = time.time()
await self._save_task(task_result)
async def cancel_task(self, task_id: str) -> bool:
"""Cancels a background task in execution."""
if task_id in self._background_tasks:
task = self._background_tasks[task_id]
task.cancel()
await self._update_task_status(task_id, TaskStatus.CANCELLED)
return True
return False
async def list_tasks(self, status_filter: Optional[TaskStatus] = None) -> List[TaskResult]:
"""Lists all tasks, optionally filtered by status."""
conn = sqlite3.connect(self.db_path)
if status_filter:
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks WHERE status = ? ORDER BY updated_at DESC
""", (status_filter.value,))
else:
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks ORDER BY updated_at DESC
""")
rows = cursor.fetchall()
conn.close()
tasks = []
for row in rows:
result_data = None
if row[5]:
try:
result_data = json.loads(row[5])
except json.JSONDecodeError:
pass
tasks.append(TaskResult(
task_id=row[0],
status=TaskStatus(row[1]),
progress=row[2],
total=row[3],
message=row[4],
result_data=result_data,
error=row[6],
created_at=row[7],
updated_at=row[8],
completed_at=row[9]
))
return tasks
# Create the MCP server with durability capabilities
server = FastMCP(
name="MCP Durability Server",
instructions="An MCP server that demonstrates durability for long-running agents"
)
# Initialize the task manager
task_manager = DurableTaskManager()
# Resources to track the state of tasks
@server.resource("task://status/{task_id}")
async def get_task_status(task_id: str) -> str:
"""Gets the current status of a task by ID."""
task_result = await task_manager.get_task(task_id)
if not task_result:
return json.dumps({"error": "Task not found", "task_id": task_id})
data = {
"task_id": task_result.task_id,
"status": task_result.status.value,
"progress": task_result.progress,
"total": task_result.total,
"message": task_result.message,
"result_data": task_result.result_data,
"error": task_result.error,
"created_at": task_result.created_at,
"updated_at": task_result.updated_at,
"completed_at": task_result.completed_at,
}
return json.dumps(data)
@server.resource("task://list")
async def list_all_tasks() -> str:
"""Lists all tasks with their current status."""
tasks = await task_manager.list_tasks()
data = {
"tasks": [
{
"task_id": task.task_id,
"status": task.status.value,
"progress": task.progress,
"total": task.total,
"message": task.message,
"created_at": task.created_at,
"updated_at": task.updated_at,
"completed_at": task.completed_at,
"has_result": task.result_data is not None,
"has_error": task.error is not None
} for task in tasks
],
"total": len(tasks)
}
return json.dumps(data)
@server.resource("task://list/{status}")
async def list_tasks_by_status(status: str) -> str:
"""Lists tasks filtered by status."""
try:
status_enum = TaskStatus(status.lower())
tasks = await task_manager.list_tasks(status_enum)
data = {
"tasks": [
{
"task_id": task.task_id,
"status": task.status.value,
"progress": task.progress,
"total": task.total,
"message": task.message,
"created_at": task.created_at,
"updated_at": task.updated_at,
"completed_at": task.completed_at
} for task in tasks
],
"status_filter": status,
"total": len(tasks)
}
return json.dumps(data)
except ValueError:
data = {
"error": f"Invalid status: {status}",
"valid_statuses": [s.value for s in TaskStatus]
}
return json.dumps(data)
# Tools to demonstrate durability functionality
@server.tool
async def start_data_migration(
source_path: str,
destination_path: str,
ctx: Context,
record_count: int = 1000
) -> str:
"""
Starts a long-running data migration task.
Args:
source_path: Source path of the data
destination_path: Destination path of the data
record_count: Number of records to migrate
ctx: Contexto MCP
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting migration of {record_count} records from {source_path} to {destination_path}")
print(f"Starting migration of {record_count} records from {source_path} to {destination_path}")
async def migration_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Simulated data migration task."""
batch_size = 100
migrated = 0
for batch_start in range(0, record_count, batch_size):
# Simulate batch processing
await asyncio.sleep(1) # Simulate work
batch_end = min(batch_start + batch_size, record_count)
migrated = batch_end
# Update progress
progress_pct = (migrated / record_count) * 100
message = f"Migrated {migrated}/{record_count} records"
await manager.update_task_progress(task_id, progress_pct, 100.0, message)
try:
await context.report_progress(migrated, record_count, message)
except Exception:
pass
try:
await context.info(f"Data migration completed: {migrated} records")
except Exception:
pass
return {
"migrated_records": migrated,
"total_records": record_count,
"source_path": source_path,
"destination_path": destination_path,
"success": True,
"completion_time": time.time()
}
# Create the task and start background processing
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, migration_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Data migration started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def start_batch_processing(
batch_size: int,
total_items: int,
ctx: Context,
processing_delay: float = 10
) -> str:
"""
Starts a batch processing task.
Args:
batch_size: Size of each batch
total_items: Total number of items to process
processing_delay: Delay per batch in seconds
ctx: MCP context
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting batch processing: {total_items} items in batches of {batch_size}")
async def batch_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Batch processing task."""
processed = 0
for i in range(0, total_items, batch_size):
await asyncio.sleep(processing_delay) # Simulate work
batch_end = min(i + batch_size, total_items)
processed = batch_end
progress = (processed / total_items) * 100
message = f"Processed {processed}/{total_items} items"
await manager.update_task_progress(task_id, progress, 100.0, message)
try:
await context.report_progress(processed, total_items, message)
except Exception:
pass
try:
await context.info(f"Batch processing completed: {processed} items")
except Exception:
pass
return {
"processed": processed,
"total": total_items,
"batch_size": batch_size,
"processing_delay": processing_delay,
"success": True
}
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, batch_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Batch processing started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def start_ml_training(
model_name: str,
ctx: Context,
dataset_size: int = 10000,
epochs: int = 100
) -> str:
"""
Simulates machine learning model training.
Args:
model_name: Name of the model to train
dataset_size: Size of the dataset
epochs: Number of training epochs
ctx: MCP context
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting training of model '{model_name}' with {dataset_size} samples by {epochs} epochs")
async def training_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Simulated ML training task."""
for epoch in range(1, epochs + 1):
# Simulate training of an epoch
await asyncio.sleep(10)
# Simulate training metrics
loss = 1.0 - (epoch / epochs) * 0.8 + (epoch % 10) * 0.01
accuracy = (epoch / epochs) * 0.95 + (epoch % 5) * 0.002
progress = (epoch / epochs) * 100
message = f"Epoch {epoch}/{epochs} - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}"
await manager.update_task_progress(task_id, progress, 100.0, message)
try:
await context.report_progress(epoch, epochs, message)
except Exception:
pass
try:
await context.info(f"Model '{model_name}' training completed")
except Exception:
pass
return {
"model_name": model_name,
"dataset_size": dataset_size,
"epochs_completed": epochs,
"final_loss": loss,
"final_accuracy": accuracy,
"success": True,
"training_time_seconds": epochs * 0.5
}
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, training_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Training started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def cancel_task(task_id: str, ctx: Context) -> str:
"""
Cancels an in-progress task.
Args:
task_id: ID of the task to cancel
ctx: MCP context
Returns:
Status message of the cancellation
"""
success = await task_manager.cancel_task(task_id)
if success:
await ctx.info(f"Task {task_id} cancelled successfully")
return f"Task {task_id} cancelled. Status available at: task://status/{task_id}"
else:
await ctx.warning(f"Task {task_id} not found or cannot be cancelled")
return f"Task {task_id} not found or cannot be cancelled"
@server.tool
async def get_server_stats(ctx: Context) -> Dict[str, Any]:
"""
Gets durability server statistics.
Args:
ctx: MCP context
Returns:
Server statistics
"""
all_tasks = await task_manager.list_tasks()
stats = {
"total_tasks": len(all_tasks),
"running_tasks": len([t for t in all_tasks if t.status == TaskStatus.RUNNING]),
"completed_tasks": len([t for t in all_tasks if t.status == TaskStatus.COMPLETED]),
"failed_tasks": len([t for t in all_tasks if t.status == TaskStatus.FAILED]),
"cancelled_tasks": len([t for t in all_tasks if t.status == TaskStatus.CANCELLED]),
"pending_tasks": len([t for t in all_tasks if t.status == TaskStatus.PENDING]),
"active_background_tasks": len(task_manager._background_tasks),
}
await ctx.info(f"Server statistics: {stats['total_tasks']} total tasks, {stats['running_tasks']} running")
return stats
if __name__ == "__main__":
print("Durability MCP server started")
print("Available tools:")
print(" - start_data_migration: Starts data migration")
print(" - start_batch_processing: Starts batch processing")
print(" - start_ml_training: Simulates ML training")
print(" - cancel_task: Cancels a task")
print(" - get_server_stats: Gets server statistics")
print(" Resources available:")
print(" - task://status/{task_id}: Specific task status")
print(" - task://list: Lists all tasks")
print(" - task://list/{status}: Lists tasks by status")
server.run(transport="http", host="127.0.0.1", port=8080)
Copy
	
Writing MCP_durability_server/server.py

Cliente MCP com durabilidadelink image 25

O cliente será muito semelhante ao que criamos nas publicações streamable MCP e resumable MCP.

Implementação do cliente MCP com durabilidadelink image 26

Haverá uma classe DurabilityClient que terá métodos para se conectar ao servidor, iniciar as três tarefas possíveis (migração de dados, processamento de dados e treinamento de um modelo), obter o status de uma tarefa, obter uma lista de todas as tarefas ou cancelar uma tarefa.

Criar um ambiente virtual do clientelink image 27

Primeiro criamos a pasta onde vamos desenvolvê-lo.

	
!mkdir MCP_durability_client
Copy

Criamos o ambiente com uv

	
!cd MCP_durability_client && uv init .
Copy
	
Initialized project `mcp-durability-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_client`

Iniciamos o ambiente

	
!cd MCP_durability_client && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos as bibliotecas necessárias

	
!cd MCP_durability_client && uv add fastmcp
Copy
	
Resolved 64 packages in 17ms
Installed 61 packages in 80ms2
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.3
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.23.0
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.3.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.1
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.15.0
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código do clientelink image 28

	
%%writefile MCP_durability_client/client.py
"""
Durability MCP client
Client that demonstrates how to interact with an MCP server that implements durability.
Shows how to:
1. Start long-running tasks
2. Monitor progress using resource polling
3. Subscribe to status updates (simulated)
4. Handle persistent tasks that survive server restarts
Usage:
python client.py
"""
import asyncio
import json
import time
from typing import Any, Dict, List, Optional
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
import argparse
class DurabilityClient:
"""Client that demonstrates durability patterns with MCP."""
def __init__(self, server_command: List[str]):
"""
Initializes the durability client.
Args:
server_command: Command to execute the MCP server
"""
self.server_command = server_command
self._polling_tasks: Dict[str, asyncio.Task] = {}
async def connect(self) -> Client:
"""Creates and connects the client to the server."""
transport = StreamableHttpTransport(
url="http://127.0.0.1:8080/mcp"
)
client = Client(transport)
return client
async def start_data_migration(
self,
client: Client,
source_path: str = "/data/source",
destination_path: str = "/data/destination",
record_count: int = 1000
) -> str:
"""
Starts a data migration task and returns the resource link.
Args:
client: MCP connected client
source_path: Source path
destination_path: Destination path
record_count: Number of records to migrate
Returns:
Resource URI to track progress
"""
print(f"🚀 Starting data migration: {record_count} records")
print(f" Source: {source_path}")
print(f" Destination: {destination_path}")
result = await client.call_tool(
"start_data_migration",
{
"source_path": source_path,
"destination_path": destination_path,
"record_count": record_count
}
)
resource_uri = result.content[0].text
print(f"✅ Task started. Resource URI: {resource_uri}")
return resource_uri
async def start_batch_processing(
self,
client: Client,
batch_size: int = 50,
total_items: int = 500,
processing_delay: float = 0.3
) -> str:
"""
Starts batch processing.
Args:
client: MCP connected client
batch_size: Batch size
total_items: Total items
processing_delay: Processing delay
Returns:
Resource URI to track progress
"""
print(f"🔄 Starting batch processing:")
print(f" Total items: {total_items}")
print(f" Batch size: {batch_size}")
print(f" Processing delay: {processing_delay}s")
result = await client.call_tool(
"start_batch_processing",
{
"batch_size": batch_size,
"total_items": total_items,
"processing_delay": processing_delay
}
)
resource_uri = result.content[0].text
print(f"✅ Batch processing started. Resource URI: {resource_uri}")
return resource_uri
async def start_ml_training(
self,
client: Client,
model_name: str = "clasificador-texto",
dataset_size: int = 5000,
epochs: int = 50
) -> str:
"""
Starts ML model training.
Args:
client: MCP connected client
model_name: Model name
dataset_size: Dataset size
epochs: Number of epochs
Returns:
Resource URI to track progress
"""
print(f"🤖 Starting ML training:")
print(f" Model: {model_name}")
print(f" Dataset size: {dataset_size} samples")
print(f" Epochs: {epochs}")
result = await client.call_tool(
"start_ml_training",
{
"model_name": model_name,
"dataset_size": dataset_size,
"epochs": epochs
}
)
resource_uri = result.content[0].text
print(f"✅ ML training started. Resource URI: {resource_uri}")
return resource_uri
async def get_task_status(self, client: Client, resource_uri: str) -> Dict[str, Any]:
"""
Gets the current status of a task.
Args:
client: MCP connected client
resource_uri: Resource URI of the task
Returns:
Current status of the task
"""
try:
result = await client.read_resource(resource_uri)
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}, text={getattr(content_block, 'text', None)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading resource: {str(e)}"}
async def poll_task_until_complete(
self,
client: Client,
resource_uri: str,
poll_interval: float = 1.0,
max_duration: float = 10.0
) -> Dict[str, Any]:
"""
Polls a task until it completes or fails.
Args:
client: MCP connected client
resource_uri: Resource URI of the task
poll_interval: Interval between polls in seconds
max_duration: Maximum duration of polling
Returns:
Final status of the task
"""
task_id = resource_uri.split("/")[-1]
print(f"📊 Monitoring task {task_id}...")
start_time = time.time()
last_progress = -1
while time.time() - start_time < max_duration:
status = await self.get_task_status(client, resource_uri)
if "error" in status and status["error"] is not None:
print(f"Error getting status: {status['error']}")
return status
current_status = status.get("status", "unknown")
progress = status.get("progress", 0)
message = status.get("message", "")
# Show updates only if the progress changed
if progress != last_progress:
progress_bar = self._create_progress_bar(progress)
print(f" {progress_bar} {progress:5.1f}% | {current_status.upper()} | {message}")
last_progress = progress
# Check if the task is complete
if current_status in ["completed", "failed", "cancelled"]:
print(f"Task {current_status}: {task_id}")
if current_status == "completed" and status.get("result_data"):
print("Results:")
self._print_results(status["result_data"])
elif current_status == "failed" and status.get("error"):
print(f"Error: {status['error']}")
return status
await asyncio.sleep(poll_interval)
print(f"Timeout for task {task_id}")
return await self.get_task_status(client, resource_uri)
def _create_progress_bar(self, progress: float, width: int = 20) -> str:
"""Creates a visual progress bar."""
filled = int(width * progress / 100)
bar = "█" * filled + "░" * (width - filled)
return f"[{bar}]"
def _print_results(self, result_data: Dict[str, Any], indent: int = 4) -> None:
"""Prints the results in a formatted way."""
spaces = " " * indent
for key, value in result_data.items():
if isinstance(value, dict):
print(f"{spaces}{key}:")
self._print_results(value, indent + 2)
elif isinstance(value, list):
print(f"{spaces}{key}: {len(value)} items")
else:
print(f"{spaces}{key}: {value}")
async def list_all_tasks(self, client: Client) -> Dict[str, Any]:
"""Lists all tasks on the server."""
try:
result = await client.read_resource("task://list")
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading task list: {str(e)}"}
async def list_tasks_by_status(self, client: Client, status: str) -> Dict[str, Any]:
"""Lists tasks filtered by status."""
try:
result = await client.read_resource(f"task://list/{status}")
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading task list: {str(e)}"}
async def print_task_list(self, client: Client) -> None:
"""Prints a formatted list of all tasks."""
print("Task list:")
task_list = await self.list_all_tasks(client)
if "error" in task_list:
print(f" {task_list['error']}")
return
tasks = task_list.get("tasks", [])
if not tasks:
print(" No tasks")
return
for task in tasks:
task_id = task["task_id"]
status = task["status"].upper()
progress = task.get("progress", 0)
message = task.get("message", "")
print(f" ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")
async def select_task(self, client: Client) -> Optional[str]:
"""
Shows a numbered list of tasks and allows the user to select one.
Args:
client: MCP connected client
Returns:
The ID of the selected task, or None if no task was selected
"""
print("📋 Available tasks:")
task_list = await self.list_all_tasks(client)
if "error" in task_list:
print(f" ❌ {task_list['error']}")
return None
tasks = task_list.get("tasks", [])
if not tasks:
print(" No tasks available")
return None
# Show tasks with numbers
for i, task in enumerate(tasks, 1):
task_id = task["task_id"]
status = task["status"].upper()
progress = task.get("progress", 0)
message = task.get("message", "")
print(f" {i}. ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")
# Ask user to select
try:
choice = input(f" Select a task (1-{len(tasks)}) or press Enter to cancel: ").strip()
if not choice:
print("Selection cancelled")
return None
choice_num = int(choice)
if 1 <= choice_num <= len(tasks):
selected_task = tasks[choice_num - 1]
selected_id = selected_task["task_id"]
print(f"✅ Selected task: {selected_id}")
return selected_id
else:
print(f"❌ Invalid selection. Please choose between 1 and {len(tasks)}")
return None
except ValueError:
print("❌ Invalid input. Please enter a number")
return None
except Exception as e:
print(f"❌ Error selecting task: {str(e)}")
return None
async def cancel_task(self, client: Client, task_id: str) -> str:
"""Cancels a specific task."""
try:
result = await client.call_tool("cancel_task", {"task_id": task_id})
response = result.content[0].text
print(f"🚫 {response}")
return response
except Exception as e:
error_msg = f"Error canceling task: {str(e)}"
print(f"❌ {error_msg}")
return error_msg
async def get_server_stats(self, client: Client) -> Dict[str, Any]:
"""Gets server statistics."""
try:
result = await client.call_tool("get_server_stats", {})
# result.content is a list of ContentBlock
if hasattr(result, 'content') and len(result.content) > 0:
content = result.content[0]
if hasattr(content, 'text'):
return json.loads(content.text)
else:
# If not text, it may be the object directly
return content if isinstance(content, dict) else {"error": f"Unexpected format: {type(content)}"}
else:
return {"error": "Empty response from get_server_stats"}
except Exception as e:
return {"error": f"Error getting server statistics: {str(e)}"}
async def print_server_stats(self, client: Client) -> None:
"""Prints server statistics."""
print("📊 Server statistics:")
stats = await self.get_server_stats(client)
if "error" in stats:
print(f" ❌ {stats['error']}")
return
print(f" Total tasks: {stats.get('total_tasks', 0)}")
print(f" Running: {stats.get('running_tasks', 0)}")
print(f" Completed: {stats.get('completed_tasks', 0)}")
print(f" Failed: {stats.get('failed_tasks', 0)}")
print(f" Cancelled: {stats.get('cancelled_tasks', 0)}")
print(f" Pending: {stats.get('pending_tasks', 0)}")
print(f" Active background tasks: {stats.get('active_background_tasks', 0)}")
async def interactive_demo(server_path: Optional[str] = None):
"""Interactive demo to explore the system."""
print("🚀 Interactive Demo of the MCP Durability System")
print("=" * 55)
client_manager = DurabilityClient([])
async with await client_manager.connect() as client:
print("🔗 Connected to the durability server")
while True:
print(" " + "=" * 40)
print("Available options:")
print("1. Start data migration")
print("2. Start batch processing")
print("3. Start ML training")
print("4. View server statistics")
print("5. View task list")
print("6. Monitor specific task")
print("7. Cancel task")
print("8. Exit")
try:
choice = input(" Select an option (1-8): ").strip()
if choice == "1":
records = int(input("Number of records to migrate (default 500): ") or "500")
uri = await client_manager.start_data_migration(client, record_count=records)
print(f"Task URI: {uri}")
elif choice == "2":
total = int(input("Total elements (default 300): ") or "300")
batch_size = int(input("Batch size (default 30): ") or "30")
uri = await client_manager.start_batch_processing(client, total_items=total, batch_size=batch_size)
print(f"Task URI: {uri}")
elif choice == "3":
model_name = input("Model name (default 'demo-model'): ") or "demo-model"
epochs = int(input("Number of epochs (default 25): ") or "25")
uri = await client_manager.start_ml_training(client, model_name=model_name, epochs=epochs)
print(f"Task URI: {uri}")
elif choice == "4":
await client_manager.print_server_stats(client)
elif choice == "5":
await client_manager.print_task_list(client)
elif choice == "6":
task_id = await client_manager.select_task(client)
if task_id:
uri = f"task://status/{task_id}"
await client_manager.poll_task_until_complete(client, uri)
elif choice == "7":
task_id = await client_manager.select_task(client)
if task_id:
await client_manager.cancel_task(client, task_id)
elif choice == "8":
print("👋 Bye!")
break
else:
print("❌ Invalid option. Please select 1-8.")
except KeyboardInterrupt:
print(" 👋 Demo interrupted. Bye!")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
if __name__ == "__main__":
try:
asyncio.run(interactive_demo())
except KeyboardInterrupt:
print(" 👋 Bye!")
except Exception as e:
print(f"❌ Error: {str(e)}")
Copy
	
Writing MCP_durability_client/client.py

Execuçãolink image 29

Primeiro, iniciamos o servidor

	
!cd MCP_durability_server && source .venv/bin/activate && uv run server.py
Copy
	
Durability MCP server started
Available tools:
- start_data_migration: Starts data migration
- start_batch_processing: Starts batch processing
- start_ml_training: Simulates ML training
- cancel_task: Cancels a task
- get_server_stats: Gets server statistics
Resources available:
- task://status/{task_id}: Specific task status
- task://list: Lists all tasks
- task://list/{status}: Lists tasks by status
╭─ FastMCP 2.0 ──────────────────────────────────────────────────────────────╮
│ │
│ _ __ ___ ______ __ __ _____________ ____ ____ │
│ _ __ ___ / ____/___ ______/ /_/ |/ / ____/ __ |___ / __ │
│ _ __ ___ / /_ / __ `/ ___/ __/ /|_/ / / / /_/ / ___/ / / / / / │
│ _ __ ___ / __/ / /_/ (__ ) /_/ / / / /___/ ____/ / __/_/ /_/ / │
│ _ __ ___ /_/ __,_/____/__/_/ /_/____/_/ /_____(_)____/ │
│ │
│ │
│ │
│ 🖥️ Server name: MCP Durability Server │
│ 📦 Transport: Streamable-HTTP │
│ 🔗 Server URL: http://127.0.0.1:8080/mcp │
│ │
│ 📚 Docs: https://gofastmcp.com │
│ 🚀 Deploy: https://fastmcp.cloud │
│ │
│ 🏎️ FastMCP version: 2.11.3 │
│ 🤝 MCP version: 1.13.1 │
│ │
╰────────────────────────────────────────────────────────────────────────────╯
[08/28/25 11:46:08] INFO Starting MCP server 'MCP Durability ]8;id=396160;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py\server.py]8;;\:]8;id=646262;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py#1522\1522]8;;\
Server' with transport 'http' on
http://127.0.0.1:8080/mcp
INFO: Started server process [55234]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)

Agora executamos o cliente

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

Primeiro, selecionamos a opção 5 para verificar se não há nenhuma tarefa em execução.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
No tasks
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

Vemos que sai

Task list:
No tasks

Não há nenhuma tarefa em execução.

Agora selecionamos a opção 1 para iniciar a tarefa de migração de dados e selecionamos 100.000 amostras para garantir que a tarefa dure o suficiente para que possamos fazer tudo o que precisamos na postagem.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 1
Number of records to migrate (default 500): 100000
🚀 Starting data migration: 100000 records
Source: /data/source
Destination: /data/destination
[08/28/25 12:05:22] INFO Server log: Starting migration of logging.py:40
100000 records from /data/source to
/data/destination
INFO Server log: Data migration started. logging.py:40
Track progress at:
task://status/821a210a-8672-4eba-b27c
-500bf63e58c1
✅ Task started. Resource URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Task URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1

Selecionamos a opção 5 para ver a lista de tarefas

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 1.0% | Migrated 1000/100000 records

Agora aparece a tarefa que acabamos de solicitar ao servidor MCP, podemos ver seu ID e seu status, ela contém 1000 dados de 100.000.

Selecionamos a opção 8 para encerrar o cliente.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 8
👋 Bye!

Executamos novamente o cliente

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

Selecionamos novamente a opção 5 para ver a lista de tarefas em execução no servidor.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 3.4% | Migrated 3400/100000 records

Vemos que a tarefa continuou sendo executada e que agora tem mais dados processados. Antes tinha 1000 e agora tem 3400.

Now we select option 6 to monitor the task we requested from the server. This will allow us to monitor it for 10 seconds.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 6
📋 Available tasks:
1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 4.2% | Migrated 4200/100000 records
Select a task (1-1) or press Enter to cancel: 1
✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1
📊 Monitoring task 821a210a-8672-4eba-b27c-500bf63e58c1...
[░░░░░░░░░░░░░░░░░░░░] 4.9% | RUNNING | Migrated 4900/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.0% | RUNNING | Migrated 5000/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.1% | RUNNING | Migrated 5100/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.2% | RUNNING | Migrated 5200/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.3% | RUNNING | Migrated 5300/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.4% | RUNNING | Migrated 5400/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.5% | RUNNING | Migrated 5500/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.6% | RUNNING | Migrated 5600/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.7% | RUNNING | Migrated 5700/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.8% | RUNNING | Migrated 5800/100000 records
Timeout for task 821a210a-8672-4eba-b27c-500bf63e58c1

Finally, we cancel the task using option 7.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 7
📋 Available tasks:
1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 6.1% | Migrated 6100/100000 records
Select a task (1-1) or press Enter to cancel: 1
✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1
[08/28/25 12:06:25] INFO Server log: Task logging.py:40
821a210a-8672-4eba-b27c-500bf63e58c1
cancelled successfully
🚫 Task 821a210a-8672-4eba-b27c-500bf63e58c1 cancelled. Status available at: task://status/821a210a-8672-4eba-b27c-500bf63e58c1

Now we select option 5 again to see the list of tasks.

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | CANCELLED | 6.3% | Migrated 6300/100000 records

Vemos que la tarea que le pedimos al servidor aparece como cancelada (CANCELLED)

Continuar lendo

Stream Informações em MCP: Guia Completo para Atualizações de Progresso em Tempo Real com FastMCP

Stream Informações em MCP: Guia Completo para Atualizações de Progresso em Tempo Real com FastMCP

Aprenda como implementar streaming em tempo real em aplicações MCP (Model Context Protocol) usando FastMCP. Este guia abrangente mostra como criar servidores e clientes MCP que suportam atualizações de progresso e informações streaming para tarefas de longa duração. Você construirá ferramentas habilitadas para streaming que fornecem feedback em tempo real durante processamento de dados, upload de arquivos, tarefas de monitoramento e outras operações que demandam muito tempo. Descubra como usar StreamableHttpTransport, implementar manipuladores de progresso com Context e criar barras de progresso visuais que melhoram a experiência do usuário ao trabalhar com aplicações MCP que requerem feedback contínuo.

Últimos posts -->

Você viu esses projetos?

Horeca chatbot

Horeca chatbot Horeca chatbot
Python
LangChain
PostgreSQL
PGVector
React
Kubernetes
Docker
GitHub Actions

Chatbot conversacional para cozinheiros de hotéis e restaurantes. Um cozinheiro, gerente de cozinha ou serviço de quarto de um hotel ou restaurante pode falar com o chatbot para obter informações sobre receitas e menus. Mas também implementa agentes, com os quais pode editar ou criar novas receitas ou menus

Naviground

Naviground Naviground

Subtify

Subtify Subtify
Python
Whisper
Spaces

Gerador de legendas para vídeos no idioma que você desejar. Além disso, coloca uma legenda de cor diferente para cada pessoa

Ver todos os projetos -->

Quer aplicar IA no seu projeto? Entre em contato!

Quer melhorar com essas dicas?

Últimos tips -->

Use isso localmente

Os espaços do Hugging Face nos permitem executar modelos com demos muito simples, mas e se a demo quebrar? Ou se o usuário a deletar? Por isso, criei contêineres docker com alguns espaços interessantes, para poder usá-los localmente, aconteça o que acontecer. Na verdade, se você clicar em qualquer botão de visualização de projeto, ele pode levá-lo a um espaço que não funciona.

Flow edit

Flow edit Flow edit

Edite imagens com este modelo de Flow. Baseado em SD3 ou FLUX, você pode editar qualquer imagem e gerar novas

FLUX.1-RealismLora

FLUX.1-RealismLora FLUX.1-RealismLora
Ver todos os contêineres -->

Quer aplicar IA no seu projeto? Entre em contato!

Você quer treinar seu modelo com esses datasets?

short-jokes-dataset

Dataset com piadas em inglês

opus100

Dataset com traduções de inglês para espanhol

netflix_titles

Dataset com filmes e séries da Netflix

Ver mais datasets -->