MCP Durability: Servidor y Cliente con Persistencia para Tareas de Larga Duración

MCP Durability: Servidor y Cliente con Persistencia para Tareas de Larga Duración MCP Durability: Servidor y Cliente con Persistencia para Tareas de Larga Duración

En este post vamos a ver cómo crear un servidor y un cliente MCP con la capacidad de que el cliente le pida una tarea de larga duración al servidor, parar el cliente, la tarea se sigue ejecutando en el servidor, volver a conectar el cliente y ver el estado de la tarea.

Servidor MCP con durabilidadlink image 1

El servidor va a ser parecido al que hicimos en los posts streamable MCP y resumable MCP, con la diferencia de que ahora vamos a añadir la capacidad de que el cliente le pida una tarea de larga duración al servidor, parar el cliente, la tarea se sigue ejecutando en el servidor, volver a conectar el cliente y ver el estado de la tarea.

Además, en este caso, vamos a guardar las tareas en una base de datos SQLite para hacerlo más profesional, ya que en los anteriores post guardábamos la información en archivos json.

Implementación del servidor MCP con durabilidadlink image 2

El servidor va a tener una clase DurableTaskManager que va a gestionar la creación de las tareas, las va a guardar en la base de datos, va a reportar el estado y las va a poder cancelar.

Va a tener tres resources, para obtener el estado de una tarea, para listar todas las tareas y para listar las tareas por estado.

Y va a tener cinco tools, tres van a ser tareas en concreto (migración de datos, procesado de datos y entrenamiento de modelo), una para cancelar una tarea y la última para obtener las estadísticas del servidor.

Crear entorno virtual del servidorlink image 3

Primero creamos la carpeta donde lo vamos a desarrollar

	
!mkdir MCP_durability_server
Copy

Creamos el entorno con uv

	
!cd MCP_durability_server && uv init .
Copy
	
Initialized project `mcp-durability-server` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_server`

Lo iniciamos

	
!cd MCP_durability_server && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos las librerías necesarias

	
!cd MCP_durability_server && uv add fastmcp
Copy
	
Resolved 64 packages in 344ms
Installed 61 packages in 136ms ░░░░░░░░░░░░░░░░░░░░ [0/0] Installing wheels...
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.3
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.23.0
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.3.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.1
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.15.0
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código del servidorlink image 4

	
%%writefile MCP_durability_server/server.py
"""
MCP Durability Server
MPC server that implements durability for long-running agents using Resource links.
Allows long-running operations to survive server restarts and provides state tracking
outside of band.
Implemented pattern:
1. Tools return resource links immediately
2. Background processing continues asynchronously
3. Clients can poll or subscribe to resources for state updates
Usage:
python server.py
"""
import asyncio
import json
import sqlite3
import time
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
from fastmcp import FastMCP
from fastmcp.server.context import Context
class TaskStatus(Enum):
"""Long running task status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskResult:
"""Result data for a completed task."""
task_id: str
status: TaskStatus
progress: float = 0.0
total: Optional[float] = None
message: Optional[str] = None
result_data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: float = 0.0
updated_at: float = 0.0
completed_at: Optional[float] = None
class DurableTaskManager:
"""Manages persistent task state and background execution."""
def __init__(self, db_path: str = "mcp_tasks.db"):
self.db_path = db_path
self._background_tasks: Dict[str, asyncio.Task] = {}
self._setup_database()
def _setup_database(self) -> None:
"""Initializes SQLite database for task persistence."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
progress REAL DEFAULT 0.0,
total REAL,
message TEXT,
result_data TEXT,
error TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
completed_at REAL
)
""")
conn.commit()
conn.close()
async def create_task(self, task_id: Optional[str] = None) -> str:
"""Creates a new task and returns its ID."""
if not task_id:
task_id = str(uuid.uuid4())
current_time = time.time()
task_result = TaskResult(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=current_time,
updated_at=current_time
)
await self._save_task(task_result)
return task_id
async def start_background_task(
self,
task_id: str,
task_function,
context: Optional[Context] = None
) -> None:
"""Starts a background task and tracks its execution."""
async def wrapper():
try:
await self._update_task_status(task_id, TaskStatus.RUNNING)
# Execute the real task
if context:
result = await task_function(task_id, context, self)
else:
result = await task_function(task_id, self)
# Mark as completed with results
await self._update_task_completion(task_id, result)
except asyncio.CancelledError:
await self._update_task_status(task_id, TaskStatus.CANCELLED)
raise
except Exception as e:
await self._update_task_error(task_id, str(e))
finally:
# Clean up background task reference
self._background_tasks.pop(task_id, None)
# Start the task in the background
task = asyncio.create_task(wrapper())
self._background_tasks[task_id] = task
async def _save_task(self, task_result: TaskResult) -> None:
"""Saves the task result to the database."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
INSERT OR REPLACE INTO tasks
(task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task_result.task_id,
task_result.status.value,
task_result.progress,
task_result.total,
task_result.message,
json.dumps(task_result.result_data) if task_result.result_data else None,
task_result.error,
task_result.created_at,
task_result.updated_at,
task_result.completed_at
))
conn.commit()
conn.close()
async def get_task(self, task_id: str) -> Optional[TaskResult]:
"""Retrieves the task result from the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks WHERE task_id = ?
""", (task_id,))
row = cursor.fetchone()
conn.close()
if not row:
return None
result_data = None
if row[5]: # result_data
try:
result_data = json.loads(row[5])
except json.JSONDecodeError:
pass
return TaskResult(
task_id=row[0],
status=TaskStatus(row[1]),
progress=row[2],
total=row[3],
message=row[4],
result_data=result_data,
error=row[6],
created_at=row[7],
updated_at=row[8],
completed_at=row[9]
)
async def update_task_progress(
self,
task_id: str,
progress: float,
total: Optional[float] = None,
message: Optional[str] = None
) -> None:
"""Updates the task progress."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.progress = progress
if total is not None:
task_result.total = total
if message is not None:
task_result.message = message
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_status(self, task_id: str, status: TaskStatus) -> None:
"""Updates the task status."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = status
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_completion(self, task_id: str, result_data: Any) -> None:
"""Marks the task as completed with results."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = TaskStatus.COMPLETED
task_result.progress = task_result.total or 100.0
task_result.result_data = result_data if isinstance(result_data, dict) else {"result": result_data}
task_result.completed_at = time.time()
task_result.updated_at = time.time()
await self._save_task(task_result)
async def _update_task_error(self, task_id: str, error: str) -> None:
"""Marks the task as failed with error."""
task_result = await self.get_task(task_id)
if not task_result:
return
task_result.status = TaskStatus.FAILED
task_result.error = error
task_result.updated_at = time.time()
await self._save_task(task_result)
async def cancel_task(self, task_id: str) -> bool:
"""Cancels a background task in execution."""
if task_id in self._background_tasks:
task = self._background_tasks[task_id]
task.cancel()
await self._update_task_status(task_id, TaskStatus.CANCELLED)
return True
return False
async def list_tasks(self, status_filter: Optional[TaskStatus] = None) -> List[TaskResult]:
"""Lists all tasks, optionally filtered by status."""
conn = sqlite3.connect(self.db_path)
if status_filter:
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks WHERE status = ? ORDER BY updated_at DESC
""", (status_filter.value,))
else:
cursor = conn.execute("""
SELECT task_id, status, progress, total, message, result_data, error,
created_at, updated_at, completed_at
FROM tasks ORDER BY updated_at DESC
""")
rows = cursor.fetchall()
conn.close()
tasks = []
for row in rows:
result_data = None
if row[5]:
try:
result_data = json.loads(row[5])
except json.JSONDecodeError:
pass
tasks.append(TaskResult(
task_id=row[0],
status=TaskStatus(row[1]),
progress=row[2],
total=row[3],
message=row[4],
result_data=result_data,
error=row[6],
created_at=row[7],
updated_at=row[8],
completed_at=row[9]
))
return tasks
# Create the MCP server with durability capabilities
server = FastMCP(
name="MCP Durability Server",
instructions="An MCP server that demonstrates durability for long-running agents"
)
# Initialize the task manager
task_manager = DurableTaskManager()
# Resources to track the state of tasks
@server.resource("task://status/{task_id}")
async def get_task_status(task_id: str) -> str:
"""Gets the current status of a task by ID."""
task_result = await task_manager.get_task(task_id)
if not task_result:
return json.dumps({"error": "Task not found", "task_id": task_id})
data = {
"task_id": task_result.task_id,
"status": task_result.status.value,
"progress": task_result.progress,
"total": task_result.total,
"message": task_result.message,
"result_data": task_result.result_data,
"error": task_result.error,
"created_at": task_result.created_at,
"updated_at": task_result.updated_at,
"completed_at": task_result.completed_at,
}
return json.dumps(data)
@server.resource("task://list")
async def list_all_tasks() -> str:
"""Lists all tasks with their current status."""
tasks = await task_manager.list_tasks()
data = {
"tasks": [
{
"task_id": task.task_id,
"status": task.status.value,
"progress": task.progress,
"total": task.total,
"message": task.message,
"created_at": task.created_at,
"updated_at": task.updated_at,
"completed_at": task.completed_at,
"has_result": task.result_data is not None,
"has_error": task.error is not None
} for task in tasks
],
"total": len(tasks)
}
return json.dumps(data)
@server.resource("task://list/{status}")
async def list_tasks_by_status(status: str) -> str:
"""Lists tasks filtered by status."""
try:
status_enum = TaskStatus(status.lower())
tasks = await task_manager.list_tasks(status_enum)
data = {
"tasks": [
{
"task_id": task.task_id,
"status": task.status.value,
"progress": task.progress,
"total": task.total,
"message": task.message,
"created_at": task.created_at,
"updated_at": task.updated_at,
"completed_at": task.completed_at
} for task in tasks
],
"status_filter": status,
"total": len(tasks)
}
return json.dumps(data)
except ValueError:
data = {
"error": f"Invalid status: {status}",
"valid_statuses": [s.value for s in TaskStatus]
}
return json.dumps(data)
# Tools to demonstrate durability functionality
@server.tool
async def start_data_migration(
source_path: str,
destination_path: str,
ctx: Context,
record_count: int = 1000
) -> str:
"""
Starts a long-running data migration task.
Args:
source_path: Source path of the data
destination_path: Destination path of the data
record_count: Number of records to migrate
ctx: Contexto MCP
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting migration of {record_count} records from {source_path} to {destination_path}")
print(f"Starting migration of {record_count} records from {source_path} to {destination_path}")
async def migration_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Simulated data migration task."""
batch_size = 100
migrated = 0
for batch_start in range(0, record_count, batch_size):
# Simulate batch processing
await asyncio.sleep(1) # Simulate work
batch_end = min(batch_start + batch_size, record_count)
migrated = batch_end
# Update progress
progress_pct = (migrated / record_count) * 100
message = f"Migrated {migrated}/{record_count} records"
await manager.update_task_progress(task_id, progress_pct, 100.0, message)
try:
await context.report_progress(migrated, record_count, message)
except Exception:
pass
try:
await context.info(f"Data migration completed: {migrated} records")
except Exception:
pass
return {
"migrated_records": migrated,
"total_records": record_count,
"source_path": source_path,
"destination_path": destination_path,
"success": True,
"completion_time": time.time()
}
# Create the task and start background processing
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, migration_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Data migration started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def start_batch_processing(
batch_size: int,
total_items: int,
ctx: Context,
processing_delay: float = 10
) -> str:
"""
Starts a batch processing task.
Args:
batch_size: Size of each batch
total_items: Total number of items to process
processing_delay: Delay per batch in seconds
ctx: MCP context
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting batch processing: {total_items} items in batches of {batch_size}")
async def batch_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Batch processing task."""
processed = 0
for i in range(0, total_items, batch_size):
await asyncio.sleep(processing_delay) # Simulate work
batch_end = min(i + batch_size, total_items)
processed = batch_end
progress = (processed / total_items) * 100
message = f"Processed {processed}/{total_items} items"
await manager.update_task_progress(task_id, progress, 100.0, message)
try:
await context.report_progress(processed, total_items, message)
except Exception:
pass
try:
await context.info(f"Batch processing completed: {processed} items")
except Exception:
pass
return {
"processed": processed,
"total": total_items,
"batch_size": batch_size,
"processing_delay": processing_delay,
"success": True
}
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, batch_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Batch processing started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def start_ml_training(
model_name: str,
ctx: Context,
dataset_size: int = 10000,
epochs: int = 100
) -> str:
"""
Simulates machine learning model training.
Args:
model_name: Name of the model to train
dataset_size: Size of the dataset
epochs: Number of training epochs
ctx: MCP context
Returns:
Resource URI to track progress
"""
await ctx.info(f"Starting training of model '{model_name}' with {dataset_size} samples by {epochs} epochs")
async def training_task(task_id: str, context: Context, manager: DurableTaskManager):
"""Simulated ML training task."""
for epoch in range(1, epochs + 1):
# Simulate training of an epoch
await asyncio.sleep(10)
# Simulate training metrics
loss = 1.0 - (epoch / epochs) * 0.8 + (epoch % 10) * 0.01
accuracy = (epoch / epochs) * 0.95 + (epoch % 5) * 0.002
progress = (epoch / epochs) * 100
message = f"Epoch {epoch}/{epochs} - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}"
await manager.update_task_progress(task_id, progress, 100.0, message)
try:
await context.report_progress(epoch, epochs, message)
except Exception:
pass
try:
await context.info(f"Model '{model_name}' training completed")
except Exception:
pass
return {
"model_name": model_name,
"dataset_size": dataset_size,
"epochs_completed": epochs,
"final_loss": loss,
"final_accuracy": accuracy,
"success": True,
"training_time_seconds": epochs * 0.5
}
task_id = await task_manager.create_task()
await task_manager.start_background_task(task_id, training_task, ctx)
resource_link = f"task://status/{task_id}"
await ctx.info(f"Training started. Track progress at: {resource_link}")
return resource_link
@server.tool
async def cancel_task(task_id: str, ctx: Context) -> str:
"""
Cancels an in-progress task.
Args:
task_id: ID of the task to cancel
ctx: MCP context
Returns:
Status message of the cancellation
"""
success = await task_manager.cancel_task(task_id)
if success:
await ctx.info(f"Task {task_id} cancelled successfully")
return f"Task {task_id} cancelled. Status available at: task://status/{task_id}"
else:
await ctx.warning(f"Task {task_id} not found or cannot be cancelled")
return f"Task {task_id} not found or cannot be cancelled"
@server.tool
async def get_server_stats(ctx: Context) -> Dict[str, Any]:
"""
Gets durability server statistics.
Args:
ctx: MCP context
Returns:
Server statistics
"""
all_tasks = await task_manager.list_tasks()
stats = {
"total_tasks": len(all_tasks),
"running_tasks": len([t for t in all_tasks if t.status == TaskStatus.RUNNING]),
"completed_tasks": len([t for t in all_tasks if t.status == TaskStatus.COMPLETED]),
"failed_tasks": len([t for t in all_tasks if t.status == TaskStatus.FAILED]),
"cancelled_tasks": len([t for t in all_tasks if t.status == TaskStatus.CANCELLED]),
"pending_tasks": len([t for t in all_tasks if t.status == TaskStatus.PENDING]),
"active_background_tasks": len(task_manager._background_tasks),
}
await ctx.info(f"Server statistics: {stats['total_tasks']} total tasks, {stats['running_tasks']} running")
return stats
if __name__ == "__main__":
print("Durability MCP server started")
print("Available tools:")
print(" - start_data_migration: Starts data migration")
print(" - start_batch_processing: Starts batch processing")
print(" - start_ml_training: Simulates ML training")
print(" - cancel_task: Cancels a task")
print(" - get_server_stats: Gets server statistics")
print(" Resources available:")
print(" - task://status/{task_id}: Specific task status")
print(" - task://list: Lists all tasks")
print(" - task://list/{status}: Lists tasks by status")
server.run(transport="http", host="127.0.0.1", port=8080)
Copy
	
Writing MCP_durability_server/server.py

Cliente MCP con durabilidadlink image 5

El cliente va a ser muy parecido al que hicimos en los posts streamable MCP y resumable MCP

Implementación del cliente MCP con durabilidadlink image 6

Va a haber una clase DurabilityClient que va a tener métodos para conectarse al servidor, iniciar las tres posibles tareas (migración de datos, procesado de datos y entrenamiento de un modelo), obtener el estado de una tarea, obtener una lista de todas las tareas o cancelar una tarea

Crear entorno virtual del clientelink image 7

Primero creamos la carpeta donde lo vamos a desarrollar

	
!mkdir MCP_durability_client
Copy

Creamos el entorno con uv

	
!cd MCP_durability_client && uv init .
Copy
	
Initialized project `mcp-durability-client` at `/Users/macm1/Documents/web/portafolio/posts/MCP_durability_client`

Lo iniciamos

	
!cd MCP_durability_client && uv venv
Copy
	
Using CPython 3.12.8
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate

Instalamos las librerías necesarias

	
!cd MCP_durability_client && uv add fastmcp
Copy
	
Resolved 64 packages in 17ms
Installed 61 packages in 80ms2
+ annotated-types==0.7.0
+ anyio==4.10.0
+ attrs==25.3.0
+ authlib==1.6.3
+ certifi==2025.8.3
+ cffi==1.17.1
+ charset-normalizer==3.4.3
+ click==8.2.1
+ cryptography==45.0.6
+ cyclopts==3.23.0
+ dnspython==2.7.0
+ docstring-parser==0.17.0
+ docutils==0.22
+ email-validator==2.3.0
+ exceptiongroup==1.3.0
+ fastmcp==2.11.3
+ h11==0.16.0
+ httpcore==1.0.9
+ httpx==0.28.1
+ httpx-sse==0.4.1
+ idna==3.10
+ isodate==0.7.2
+ jsonschema==4.25.1
+ jsonschema-path==0.3.4
+ jsonschema-specifications==2025.4.1
+ lazy-object-proxy==1.12.0
+ markdown-it-py==4.0.0
+ markupsafe==3.0.2
+ mcp==1.13.1
+ mdurl==0.1.2
+ more-itertools==10.7.0
+ openapi-core==0.19.5
+ openapi-pydantic==0.5.1
+ openapi-schema-validator==0.6.3
+ openapi-spec-validator==0.7.2
+ parse==1.20.2
+ pathable==0.4.4
+ pycparser==2.22
+ pydantic==2.11.7
+ pydantic-core==2.33.2
+ pydantic-settings==2.10.1
+ pygments==2.19.2
+ pyperclip==1.9.0
+ python-dotenv==1.1.1
+ python-multipart==0.0.20
+ pyyaml==6.0.2
+ referencing==0.36.2
+ requests==2.32.5
+ rfc3339-validator==0.1.4
+ rich==14.1.0
+ rich-rst==1.3.1
+ rpds-py==0.27.1
+ six==1.17.0
+ sniffio==1.3.1
+ sse-starlette==3.0.2
+ starlette==0.47.3
+ typing-extensions==4.15.0
+ typing-inspection==0.4.1
+ urllib3==2.5.0
+ uvicorn==0.35.0
+ werkzeug==3.1.1

Código del clientelink image 8

	
%%writefile MCP_durability_client/client.py
"""
Durability MCP client
Client that demonstrates how to interact with an MCP server that implements durability.
Shows how to:
1. Start long-running tasks
2. Monitor progress using resource polling
3. Subscribe to status updates (simulated)
4. Handle persistent tasks that survive server restarts
Usage:
python client.py
"""
import asyncio
import json
import time
from typing import Any, Dict, List, Optional
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
import argparse
class DurabilityClient:
"""Client that demonstrates durability patterns with MCP."""
def __init__(self, server_command: List[str]):
"""
Initializes the durability client.
Args:
server_command: Command to execute the MCP server
"""
self.server_command = server_command
self._polling_tasks: Dict[str, asyncio.Task] = {}
async def connect(self) -> Client:
"""Creates and connects the client to the server."""
transport = StreamableHttpTransport(
url="http://127.0.0.1:8080/mcp"
)
client = Client(transport)
return client
async def start_data_migration(
self,
client: Client,
source_path: str = "/data/source",
destination_path: str = "/data/destination",
record_count: int = 1000
) -> str:
"""
Starts a data migration task and returns the resource link.
Args:
client: MCP connected client
source_path: Source path
destination_path: Destination path
record_count: Number of records to migrate
Returns:
Resource URI to track progress
"""
print(f"🚀 Starting data migration: {record_count} records")
print(f" Source: {source_path}")
print(f" Destination: {destination_path}")
result = await client.call_tool(
"start_data_migration",
{
"source_path": source_path,
"destination_path": destination_path,
"record_count": record_count
}
)
resource_uri = result.content[0].text
print(f"✅ Task started. Resource URI: {resource_uri}")
return resource_uri
async def start_batch_processing(
self,
client: Client,
batch_size: int = 50,
total_items: int = 500,
processing_delay: float = 0.3
) -> str:
"""
Starts batch processing.
Args:
client: MCP connected client
batch_size: Batch size
total_items: Total items
processing_delay: Processing delay
Returns:
Resource URI to track progress
"""
print(f"🔄 Starting batch processing:")
print(f" Total items: {total_items}")
print(f" Batch size: {batch_size}")
print(f" Processing delay: {processing_delay}s")
result = await client.call_tool(
"start_batch_processing",
{
"batch_size": batch_size,
"total_items": total_items,
"processing_delay": processing_delay
}
)
resource_uri = result.content[0].text
print(f"✅ Batch processing started. Resource URI: {resource_uri}")
return resource_uri
async def start_ml_training(
self,
client: Client,
model_name: str = "clasificador-texto",
dataset_size: int = 5000,
epochs: int = 50
) -> str:
"""
Starts ML model training.
Args:
client: MCP connected client
model_name: Model name
dataset_size: Dataset size
epochs: Number of epochs
Returns:
Resource URI to track progress
"""
print(f"🤖 Starting ML training:")
print(f" Model: {model_name}")
print(f" Dataset size: {dataset_size} samples")
print(f" Epochs: {epochs}")
result = await client.call_tool(
"start_ml_training",
{
"model_name": model_name,
"dataset_size": dataset_size,
"epochs": epochs
}
)
resource_uri = result.content[0].text
print(f"✅ ML training started. Resource URI: {resource_uri}")
return resource_uri
async def get_task_status(self, client: Client, resource_uri: str) -> Dict[str, Any]:
"""
Gets the current status of a task.
Args:
client: MCP connected client
resource_uri: Resource URI of the task
Returns:
Current status of the task
"""
try:
result = await client.read_resource(resource_uri)
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}, text={getattr(content_block, 'text', None)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading resource: {str(e)}"}
async def poll_task_until_complete(
self,
client: Client,
resource_uri: str,
poll_interval: float = 1.0,
max_duration: float = 10.0
) -> Dict[str, Any]:
"""
Polls a task until it completes or fails.
Args:
client: MCP connected client
resource_uri: Resource URI of the task
poll_interval: Interval between polls in seconds
max_duration: Maximum duration of polling
Returns:
Final status of the task
"""
task_id = resource_uri.split("/")[-1]
print(f"📊 Monitoring task {task_id}...")
start_time = time.time()
last_progress = -1
while time.time() - start_time < max_duration:
status = await self.get_task_status(client, resource_uri)
if "error" in status and status["error"] is not None:
print(f"Error getting status: {status['error']}")
return status
current_status = status.get("status", "unknown")
progress = status.get("progress", 0)
message = status.get("message", "")
# Show updates only if the progress changed
if progress != last_progress:
progress_bar = self._create_progress_bar(progress)
print(f" {progress_bar} {progress:5.1f}% | {current_status.upper()} | {message}")
last_progress = progress
# Check if the task is complete
if current_status in ["completed", "failed", "cancelled"]:
print(f"Task {current_status}: {task_id}")
if current_status == "completed" and status.get("result_data"):
print("Results:")
self._print_results(status["result_data"])
elif current_status == "failed" and status.get("error"):
print(f"Error: {status['error']}")
return status
await asyncio.sleep(poll_interval)
print(f"Timeout for task {task_id}")
return await self.get_task_status(client, resource_uri)
def _create_progress_bar(self, progress: float, width: int = 20) -> str:
"""Creates a visual progress bar."""
filled = int(width * progress / 100)
bar = "█" * filled + "░" * (width - filled)
return f"[{bar}]"
def _print_results(self, result_data: Dict[str, Any], indent: int = 4) -> None:
"""Prints the results in a formatted way."""
spaces = " " * indent
for key, value in result_data.items():
if isinstance(value, dict):
print(f"{spaces}{key}:")
self._print_results(value, indent + 2)
elif isinstance(value, list):
print(f"{spaces}{key}: {len(value)} items")
else:
print(f"{spaces}{key}: {value}")
async def list_all_tasks(self, client: Client) -> Dict[str, Any]:
"""Lists all tasks on the server."""
try:
result = await client.read_resource("task://list")
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading task list: {str(e)}"}
async def list_tasks_by_status(self, client: Client, status: str) -> Dict[str, Any]:
"""Lists tasks filtered by status."""
try:
result = await client.read_resource(f"task://list/{status}")
# result should be a list of ReadResourceContents
if isinstance(result, list) and len(result) > 0:
content_block = result[0]
# The content is in the 'text' attribute
if hasattr(content_block, 'text') and content_block.text:
return json.loads(content_block.text)
else:
return {"error": f"No text found in the resource. Type: {type(content_block)}"}
else:
return {"error": "Empty resource"}
except Exception as e:
return {"error": f"Error reading task list: {str(e)}"}
async def print_task_list(self, client: Client) -> None:
"""Prints a formatted list of all tasks."""
print("Task list:")
task_list = await self.list_all_tasks(client)
if "error" in task_list:
print(f" {task_list['error']}")
return
tasks = task_list.get("tasks", [])
if not tasks:
print(" No tasks")
return
for task in tasks:
task_id = task["task_id"]
status = task["status"].upper()
progress = task.get("progress", 0)
message = task.get("message", "")
print(f" ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")
async def select_task(self, client: Client) -> Optional[str]:
"""
Shows a numbered list of tasks and allows the user to select one.
Args:
client: MCP connected client
Returns:
The ID of the selected task, or None if no task was selected
"""
print("📋 Available tasks:")
task_list = await self.list_all_tasks(client)
if "error" in task_list:
print(f" ❌ {task_list['error']}")
return None
tasks = task_list.get("tasks", [])
if not tasks:
print(" No tasks available")
return None
# Show tasks with numbers
for i, task in enumerate(tasks, 1):
task_id = task["task_id"]
status = task["status"].upper()
progress = task.get("progress", 0)
message = task.get("message", "")
print(f" {i}. ID: {task_id} | {status:9} | {progress:5.1f}% | {message}")
# Ask user to select
try:
choice = input(f" Select a task (1-{len(tasks)}) or press Enter to cancel: ").strip()
if not choice:
print("Selection cancelled")
return None
choice_num = int(choice)
if 1 <= choice_num <= len(tasks):
selected_task = tasks[choice_num - 1]
selected_id = selected_task["task_id"]
print(f"✅ Selected task: {selected_id}")
return selected_id
else:
print(f"❌ Invalid selection. Please choose between 1 and {len(tasks)}")
return None
except ValueError:
print("❌ Invalid input. Please enter a number")
return None
except Exception as e:
print(f"❌ Error selecting task: {str(e)}")
return None
async def cancel_task(self, client: Client, task_id: str) -> str:
"""Cancels a specific task."""
try:
result = await client.call_tool("cancel_task", {"task_id": task_id})
response = result.content[0].text
print(f"🚫 {response}")
return response
except Exception as e:
error_msg = f"Error canceling task: {str(e)}"
print(f"❌ {error_msg}")
return error_msg
async def get_server_stats(self, client: Client) -> Dict[str, Any]:
"""Gets server statistics."""
try:
result = await client.call_tool("get_server_stats", {})
# result.content is a list of ContentBlock
if hasattr(result, 'content') and len(result.content) > 0:
content = result.content[0]
if hasattr(content, 'text'):
return json.loads(content.text)
else:
# If not text, it may be the object directly
return content if isinstance(content, dict) else {"error": f"Unexpected format: {type(content)}"}
else:
return {"error": "Empty response from get_server_stats"}
except Exception as e:
return {"error": f"Error getting server statistics: {str(e)}"}
async def print_server_stats(self, client: Client) -> None:
"""Prints server statistics."""
print("📊 Server statistics:")
stats = await self.get_server_stats(client)
if "error" in stats:
print(f" ❌ {stats['error']}")
return
print(f" Total tasks: {stats.get('total_tasks', 0)}")
print(f" Running: {stats.get('running_tasks', 0)}")
print(f" Completed: {stats.get('completed_tasks', 0)}")
print(f" Failed: {stats.get('failed_tasks', 0)}")
print(f" Cancelled: {stats.get('cancelled_tasks', 0)}")
print(f" Pending: {stats.get('pending_tasks', 0)}")
print(f" Active background tasks: {stats.get('active_background_tasks', 0)}")
async def interactive_demo(server_path: Optional[str] = None):
"""Interactive demo to explore the system."""
print("🚀 Interactive Demo of the MCP Durability System")
print("=" * 55)
client_manager = DurabilityClient([])
async with await client_manager.connect() as client:
print("🔗 Connected to the durability server")
while True:
print(" " + "=" * 40)
print("Available options:")
print("1. Start data migration")
print("2. Start batch processing")
print("3. Start ML training")
print("4. View server statistics")
print("5. View task list")
print("6. Monitor specific task")
print("7. Cancel task")
print("8. Exit")
try:
choice = input(" Select an option (1-8): ").strip()
if choice == "1":
records = int(input("Number of records to migrate (default 500): ") or "500")
uri = await client_manager.start_data_migration(client, record_count=records)
print(f"Task URI: {uri}")
elif choice == "2":
total = int(input("Total elements (default 300): ") or "300")
batch_size = int(input("Batch size (default 30): ") or "30")
uri = await client_manager.start_batch_processing(client, total_items=total, batch_size=batch_size)
print(f"Task URI: {uri}")
elif choice == "3":
model_name = input("Model name (default 'demo-model'): ") or "demo-model"
epochs = int(input("Number of epochs (default 25): ") or "25")
uri = await client_manager.start_ml_training(client, model_name=model_name, epochs=epochs)
print(f"Task URI: {uri}")
elif choice == "4":
await client_manager.print_server_stats(client)
elif choice == "5":
await client_manager.print_task_list(client)
elif choice == "6":
task_id = await client_manager.select_task(client)
if task_id:
uri = f"task://status/{task_id}"
await client_manager.poll_task_until_complete(client, uri)
elif choice == "7":
task_id = await client_manager.select_task(client)
if task_id:
await client_manager.cancel_task(client, task_id)
elif choice == "8":
print("👋 Bye!")
break
else:
print("❌ Invalid option. Please select 1-8.")
except KeyboardInterrupt:
print(" 👋 Demo interrupted. Bye!")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
if __name__ == "__main__":
try:
asyncio.run(interactive_demo())
except KeyboardInterrupt:
print(" 👋 Bye!")
except Exception as e:
print(f"❌ Error: {str(e)}")
Copy
	
Writing MCP_durability_client/client.py

Ejecuciónlink image 9

Primero levantamos el servidor

	
!cd MCP_durability_server && source .venv/bin/activate && uv run server.py
Copy
	
Durability MCP server started
Available tools:
- start_data_migration: Starts data migration
- start_batch_processing: Starts batch processing
- start_ml_training: Simulates ML training
- cancel_task: Cancels a task
- get_server_stats: Gets server statistics
Resources available:
- task://status/{task_id}: Specific task status
- task://list: Lists all tasks
- task://list/{status}: Lists tasks by status
╭─ FastMCP 2.0 ──────────────────────────────────────────────────────────────╮
│ │
│ _ __ ___ ______ __ __ _____________ ____ ____ │
│ _ __ ___ / ____/___ ______/ /_/ |/ / ____/ __ |___ / __ │
│ _ __ ___ / /_ / __ `/ ___/ __/ /|_/ / / / /_/ / ___/ / / / / / │
│ _ __ ___ / __/ / /_/ (__ ) /_/ / / / /___/ ____/ / __/_/ /_/ / │
│ _ __ ___ /_/ __,_/____/__/_/ /_/____/_/ /_____(_)____/ │
│ │
│ │
│ │
│ 🖥️ Server name: MCP Durability Server │
│ 📦 Transport: Streamable-HTTP │
│ 🔗 Server URL: http://127.0.0.1:8080/mcp │
│ │
│ 📚 Docs: https://gofastmcp.com │
│ 🚀 Deploy: https://fastmcp.cloud │
│ │
│ 🏎️ FastMCP version: 2.11.3 │
│ 🤝 MCP version: 1.13.1 │
│ │
╰────────────────────────────────────────────────────────────────────────────╯
[08/28/25 11:46:08] INFO Starting MCP server 'MCP Durability ]8;id=396160;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py\server.py]8;;\:]8;id=646262;file:///Users/macm1/Documents/web/portafolio/posts/MCP_durability_server/.venv/lib/python3.12/site-packages/fastmcp/server/server.py#1522\1522]8;;\
Server' with transport 'http' on
http://127.0.0.1:8080/mcp
INFO: Started server process [55234]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)

Ahora ejecutamos el cliente

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

Primero seleccionamos la opción 5 para ver que no hay ninguna tarea corriendo

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
No tasks
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

Vemos que sale

Task list:
No tasks

No hay ninguna tarea corriendo

Ahora seleccionamos la opción 1 para empezar la tarea de migración de datos y seleccionamos 100.000 muestras para asegurar que la tarea dura lo suficiente para poder hacer todo lo que necesitamos en el post

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 1
Number of records to migrate (default 500): 100000
🚀 Starting data migration: 100000 records
Source: /data/source
Destination: /data/destination
[08/28/25 12:05:22] INFO Server log: Starting migration of logging.py:40
100000 records from /data/source to
/data/destination
INFO Server log: Data migration started. logging.py:40
Track progress at:
task://status/821a210a-8672-4eba-b27c
-500bf63e58c1
✅ Task started. Resource URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1
Task URI: task://status/821a210a-8672-4eba-b27c-500bf63e58c1

Seleccionamos la opción 5 para ver la lista de tareas

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 1.0% | Migrated 1000/100000 records

Ahora aparece la tarea que acabamos de pedir al servidor MCP, podemos ver su ID y su estado, lleva 1000 datos de 100.000

Seleccionamos la opción 8 para terminar el cliente

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 8
👋 Bye!

Volvemos a ejecutar el cliente

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8):

Volvemos a seleccionar la opción 5 para ver la lista de tareas ejecutándose en el servidor

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
🚀 Interactive Demo of the MCP Durability System
=======================================================
🔗 Connected to the durability server
========================================
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 3.4% | Migrated 3400/100000 records

Vemos que la tarea se ha seguido ejecutando y que lleva más datos procecesados. Antes llevaba 1000 y ahora lleva 3400

Ahora seleccionamos la opción 6 para monitorizar la tarea que hemos pedido al servidor. Esto nos va a permitir monitorizarla durante 10 segundos

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 6
📋 Available tasks:
1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 4.2% | Migrated 4200/100000 records
Select a task (1-1) or press Enter to cancel: 1
✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1
📊 Monitoring task 821a210a-8672-4eba-b27c-500bf63e58c1...
[░░░░░░░░░░░░░░░░░░░░] 4.9% | RUNNING | Migrated 4900/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.0% | RUNNING | Migrated 5000/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.1% | RUNNING | Migrated 5100/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.2% | RUNNING | Migrated 5200/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.3% | RUNNING | Migrated 5300/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.4% | RUNNING | Migrated 5400/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.5% | RUNNING | Migrated 5500/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.6% | RUNNING | Migrated 5600/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.7% | RUNNING | Migrated 5700/100000 records
[█░░░░░░░░░░░░░░░░░░░] 5.8% | RUNNING | Migrated 5800/100000 records
Timeout for task 821a210a-8672-4eba-b27c-500bf63e58c1

Por último, cancelamos la tarea mediante la opción 7

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 7
📋 Available tasks:
1. ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | RUNNING | 6.1% | Migrated 6100/100000 records
Select a task (1-1) or press Enter to cancel: 1
✅ Selected task: 821a210a-8672-4eba-b27c-500bf63e58c1
[08/28/25 12:06:25] INFO Server log: Task logging.py:40
821a210a-8672-4eba-b27c-500bf63e58c1
cancelled successfully
🚫 Task 821a210a-8672-4eba-b27c-500bf63e58c1 cancelled. Status available at: task://status/821a210a-8672-4eba-b27c-500bf63e58c1

Ahora volvemos a elegir la opción 5 para ver la lista de tareas

	
!cd MCP_durability_client && source .venv/bin/activate && uv run client.py
Copy
	
Available options:
1. Start data migration
2. Start batch processing
3. Start ML training
4. View server statistics
5. View task list
6. Monitor specific task
7. Cancel task
8. Exit
Select an option (1-8): 5
Task list:
ID: 821a210a-8672-4eba-b27c-500bf63e58c1 | CANCELLED | 6.3% | Migrated 6300/100000 records

Vemos que la tarea que le pedimos al servidor aparece como cancelada (CANCELLED)

Seguir leyendo

Stream información en MCP: Guía Completa para Actualizaciones de Progreso en Tiempo Real con FastMCP

Stream información en MCP: Guía Completa para Actualizaciones de Progreso en Tiempo Real con FastMCP

Aprende cómo implementar streaming en tiempo real en aplicaciones MCP (Model Context Protocol) usando FastMCP. Esta guía completa te muestra cómo crear servidores y clientes MCP que soportan actualizaciones de progreso e información streaming para tareas de larga duración. Construirás herramientas habilitadas para streaming que proporcionan retroalimentación en tiempo real durante el procesamiento de datos, subida de archivos, tareas de monitoreo y otras operaciones que requieren mucho tiempo. Descubre cómo usar StreamableHttpTransport, implementar manejadores de progreso con Context y crear barras de progreso visuales que mejoran la experiencia del usuario al trabajar con aplicaciones MCP que requieren retroalimentación continua.

Últimos posts -->

¿Has visto estos proyectos?

Horeca chatbot

Horeca chatbot Horeca chatbot
Python
LangChain
PostgreSQL
PGVector
React
Kubernetes
Docker
GitHub Actions

Chatbot conversacional para cocineros de hoteles y restaurantes. Un cocinero, jefe de cocina o camaeror de un hotel o restaurante puede hablar con el chatbot para obtener información de recetas y menús. Pero además implementa agentes, con los cuales puede editar o crear nuevas recetas o menús

Naviground

Naviground Naviground

Subtify

Subtify Subtify
Python
Whisper
Spaces

Generador de subtítulos para videos en el idioma que desees. Además a cada persona le pone su subtítulo de un color

Ver todos los proyectos -->

¿Quieres aplicar la IA en tu proyecto? Contactame!

¿Quieres mejorar con estos tips?

Últimos tips -->

Usa esto en local

Los espacios de Hugging Face nos permite ejecutar modelos con demos muy sencillas, pero ¿qué pasa si la demo se rompe? O si el usuario la elimina? Por ello he creado contenedores docker con algunos espacios interesantes, para poder usarlos de manera local, pase lo que pase. De hecho, es posible que si pinchas en alún botón de ver proyecto te lleve a un espacio que no funciona.

Flow edit

Flow edit Flow edit

Edita imágenes con este modelo de Flow. Basándose en SD3 o FLUX puedes editar cualquier imagen y generar nuevas

FLUX.1-RealismLora

FLUX.1-RealismLora FLUX.1-RealismLora
Ver todos los contenedores -->

¿Quieres aplicar la IA en tu proyecto? Contactame!

¿Quieres entrenar tu modelo con estos datasets?

short-jokes-dataset

Dataset de chistes en inglés

opus100

Dataset con traducciones de inglés a español

netflix_titles

Dataset con películas y series de Netflix

Ver más datasets -->