Matrix Bot AI - Полная документация
Описание
Matrix Bot AI - это интеллектуальный чат-бот для платформы Matrix, работающий на n8n. Бот запоминает историю разговоров в PostgreSQL и использует OpenAI GPT-4o-mini для генерации ответов с учетом контекста предыдущих сообщений.
Основные возможности:
- Запоминание истории разговоров в разных комнатах
- Использование контекста при ответах
- Сохранение ID и имен собеседников
- Автоматическая сортировка истории от новых к старым
Архитектура
Компоненты
1. Matrix сервер - отправляет вебхук при новом сообщении 2. n8n Workflow - обрабатывает сообщение и генерирует ответ 3. PostgreSQL - хранит историю разговоров 4. OpenAI API - генерирует ответы через GPT-4o-mini 5. Matrix Bot сервис - отправляет сообщения обратно в чат
Логика работы
Поток данных
1. Webhook приходит → Формирует данные сообщения (текст, room_id, sender, sender_name) 2. Query Memory → Получает последние 10 сообщений из истории этой комнаты 3. Build Context → Собирает контекст разговора для AI с сохранением порядка (новые сообщения в конце) 4. AI Agent → OpenAI генерирует ответ на основе контекста 5. Save and Send → Сохраняет результат в БД и отправляет сообщение в чат 6. Response → Возвращает успешный ответ на вебхук
Структура данных
Таблица conversation_history:
- room_id - VARCHAR - ID комнаты Matrix
- sender - VARCHAR - Matrix ID отправителя (@admin:domain)
- sender_name - VARCHAR - Отображаемое имя в чате (Николай)
- message - TEXT - Текст сообщения
- bot_response - TEXT - Ответ бота
- created_at - TIMESTAMP - Время создания записи
Подготовка базы данных
Выполните эти SQL команды в PgAdmin:
-- Создание таблицы conversation_history CREATE TABLE conversation_history ( id SERIAL PRIMARY KEY, room_id VARCHAR(255) NOT NULL, sender VARCHAR(255) NOT NULL, sender_name VARCHAR(255), message TEXT, bot_response TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Создание индекса для быстрого поиска по room_id CREATE INDEX idx_room_id ON conversation_history(room_id); CREATE INDEX idx_created_at ON conversation_history(created_at DESC); -- Добавление колонки sender_name если таблица уже существует ALTER TABLE conversation_history ADD COLUMN sender_name VARCHAR(255);
Nodes в Workflow
1. Webhook
Тип: n8n-nodes-base.webhook Назначение: Получает HTTP POST запрос от Matrix сервера Параметры:
- Path: matrix-bot-ai
- HTTP Method: POST
- Response Mode: responseNode
Входящие данные:
{
"body": {
"text": "сообщение",
"room_id": "!abc123:domain",
"sender": "@user:domain",
"sender_name": "User Name",
"is_dm": false
}
}
2. Format Input
Тип: n8n-nodes-base.code (JavaScript) Назначение: Форматирует входящие данные в стандартный вид
Код:
const body = $json.body; return { text: body.message || body.text, room_id: body.room_id, sender: body.sender || '', sender_name: body.sender_name || 'User', originalMessage: body.message || body.text, is_group: !body.is_dm };
Выходные данные:
{
"text": "сообщение",
"room_id": "!abc123:domain",
"sender": "@user:domain",
"sender_name": "User Name",
"originalMessage": "сообщение",
"is_group": false
}
3. Query Memory
Тип: n8n-nodes-base.postgres Назначение: Получает историю из PostgreSQL Операция: executeQuery
SQL Query:
SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '{{room_id}}' ORDER BY created_at DESC LIMIT 10
Параметр Query (в n8n):
SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '{{ "'" + $json.room_id.replace(/'/g, "''") + "'" }}' ORDER BY created_at DESC LIMIT 10
Выходные данные: Массив последних 10 сообщений (или пусто если их нет)
4. Build Context Code
Тип: n8n-nodes-base.code (JavaScript) Назначение: Собирает контекст разговора для AI из памяти
Код:
// Get Format Input from $node const formatInput = $node['Format Input'].json; // Get all items from Query Memory const allItems = $input.all(); const memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || []; let contextText = ''; if (memoryItems.length > 0) { contextText = 'Recent conversation history:\n\n'; // NEWEST FIRST - iterate from 0 to end (last item is most recent) for (let i = 0; i < memoryItems.length; i++) { const record = memoryItems[i].json || {}; if (record.message && record.sender && record.bot_response && record.created_at) { const ts = new Date(record.created_at).toLocaleString('ru-RU'); const senderName = record.sender_name || record.sender; contextText += `[${ts}] ${senderName}: ${record.message}\n`; contextText += ` → Jarvice: ${record.bot_response}\n\n`; } } } // IMPORTANT: Return ALL fields from Format Input return { context: contextText, text: formatInput.text, room_id: formatInput.room_id, sender: formatInput.sender, sender_name: formatInput.sender_name, originalMessage: formatInput.originalMessage, is_group: formatInput.is_group };
Выходные данные:
{
"context": "Recent conversation history:\n\n[время] Николай: 3+2\n → Jarvice: 5\n\n",
"text": "сообщение",
"room_id": "!abc123:domain",
"sender": "@user:domain",
"sender_name": "User Name",
"originalMessage": "сообщение",
"is_group": false
}
5. AI Agent
Тип: @n8n/n8n-nodes-langchain.agent Назначение: LangChain Agent использующий OpenAI модель
Параметры:
- Prompt Type: define
- Text:
json.text - System Message: (см. ниже - НЕ МЕНЯТЬ!)
System Message:
Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖. Ты помощник Администратора (Николай). Отвечай кратко, используй emoji. Обязательно используй контекст предыдущих сообщений для лучшего ответа. ПРАВИЛА ЧАТА (сообщай их ТОЛЬКО если тебя о них спросили есть они или нет или просят сказать какие правила в этом чате, перечисляй только эти правила и не добавляй другие от себя). 1. Сообщения в чате сохраняются одну неделю. 2. Все сообщения старше недели удаляются. 3. В чате есть AI ассистент - может помочь с вопросами. 4. Соблюдай законодательство РФ.
6. OpenAI Model
Тип: @n8n/n8n-nodes-langchain.lmChatOpenAi Назначение: Подключение к OpenAI API
Параметры:
- Model: gpt-4o-mini
- Credentials: aitunnel (ваш OpenAI API key)
7. Save and Send
Тип: n8n-nodes-base.code (JavaScript) Назначение: Подготавливает данные для сохранения и отправки
Код:
const ai = $json.output || $node['AI Agent'].json.output || ''; const buildContext = $node['Build Context Code'].json; const roomId = buildContext.room_id || ''; const isGroup = buildContext.is_group || false; const sender = buildContext.sender || ''; const senderName = buildContext.sender_name || ''; const originalMessage = buildContext.originalMessage || ''; const esc = (s) => { if (!s) return 'NULL'; return "'" + String(s).replace(/'/g, "''") + "'"; }; const query = "INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (" + esc(roomId) + ", " + esc(sender) + ", " + esc(senderName) + ", " + esc(originalMessage) + ", " + esc(ai) + ")"; return { message: ai, room_id: roomId, sender_id: isGroup ? sender : null, sender_name: isGroup ? senderName : null, query: query };
Выходные данные:
{
"message": "Ответ бота",
"room_id": "!abc123:domain",
"sender_id": "@user:domain",
"sender_name": "User Name",
"query": "INSERT INTO conversation_history..."
}
8. Send to Matrix
Тип: n8n-nodes-base.httpRequest Назначение: Отправляет ответ в Matrix через Matrix Bot сервис
Параметры:
- Method: POST
- Send Body: true
- Specify Body: json
JSON Body:
{{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }}
9. Save to Database
Тип: n8n-nodes-base.postgres Назначение: Сохраняет разговор в PostgreSQL
Параметры:
- Operation: executeQuery
- Query:
json.query
10. Respond to Webhook
Тип: n8n-nodes-base.respondToWebhook Назначение: Отправляет успешный ответ на вебхук
Параметры:
- Respond With: json
- Response Body:
true
Connections (Связи между nodes)
Webhook → Format Input Format Input → Query Memory (index 0) Format Input → Build Context Code (index 1) Query Memory → Build Context Code (index 0) Build Context Code → AI Agent OpenAI Model → AI Agent (ai_languageModel) AI Agent → Save and Send Save and Send → Send to Matrix Save and Send → Save to Database Send to Matrix → Respond to Webhook
JSON для импорта в n8n
Используйте этот JSON для создания workflow в n8n:
{
"name": "Matrix Bot AI",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "matrix-bot-ai",
"responseMode": "responseNode",
"options": {}
},
"id": "webhook-1",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"position": [-400, 304],
"webhookId": "2dad965f-08e2-412f-960a-b16c91dea686",
"typeVersion": 2
},
{
"parameters": {
"language": "javascript",
"jsCode": "const body = $json.body;\nreturn {\n text: body.message || body.text,\n room_id: body.room_id,\n sender: body.sender || '',\n sender_name: body.sender_name || 'User',\n originalMessage: body.message || body.text,\n is_group: !body.is_dm\n};"
},
"id": "format-input",
"name": "Format Input",
"type": "n8n-nodes-base.code",
"position": [-144, 304],
"typeVersion": 1
},
{
"parameters": {
"operation": "executeQuery",
"query": "={{ \"SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '\" + $json.room_id.replace(/'/g, \"''\") + \"' ORDER BY created_at DESC LIMIT 10\" }}",
"additionalFields": {}
},
"id": "query-memory",
"name": "Query Memory",
"type": "n8n-nodes-base.postgres",
"position": [0, 464],
"typeVersion": 1,
"credentials": {
"postgres": {
"id": "Sy23a0Zvp6PlFt6V",
"name": "Postgres matrix_bot_memory"
}
}
},
{
"parameters": {
"language": "javascript",
"jsCode": "// Get Format Input from $node\nconst formatInput = $node['Format Input'].json;\n\n// Get all items from Query Memory\nconst allItems = $input.all();\nconst memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || [];\n\nlet contextText = '';\n\nif (memoryItems.length > 0) {\n contextText = 'Recent conversation history:\\n\\n';\n // NEWEST FIRST - iterate from 0 to end (last item is most recent)\n for (let i = 0; i < memoryItems.length; i++) {\n const record = memoryItems[i].json || {};\n if (record.message && record.sender && record.bot_response && record.created_at) {\n const ts = new Date(record.created_at).toLocaleString('ru-RU');\n const senderName = record.sender_name || record.sender;\n contextText += `[${ts}] ${senderName}: ${record.message}\\n`;\n contextText += ` → Jarvice: ${record.bot_response}\\n\\n`;\n }\n }\n}\n\n// IMPORTANT: Return ALL fields from Format Input\nreturn {\n context: contextText,\n text: formatInput.text,\n room_id: formatInput.room_id,\n sender: formatInput.sender,\n sender_name: formatInput.sender_name,\n originalMessage: formatInput.originalMessage,\n is_group: formatInput.is_group\n};"
},
"id": "build-context-code",
"name": "Build Context Code",
"type": "n8n-nodes-base.code",
"position": [160, 304],
"typeVersion": 1
},
{
"parameters": {
"promptType": "define",
"text": "={{ $json.context + 'User (' + $json.sender_name + '): ' + $json.text }}",
"options": {
"systemMessage": "Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖.\nТы помощник Администратора (Николай).\nОтвечай кратко, используй emoji.\nОбязательно используй контекст предыдущих сообщений для лучшего ответа. Отвечай с числовыми результатами и референциями на предыдущие вычисления."
}
},
"id": "agent-1",
"name": "AI Agent",
"type": "@n8n/n8n-nodes-langchain.agent",
"position": [304, 304],
"typeVersion": 1.9
},
{
"parameters": {
"model": {
"mode": "list",
"value": "gpt-4o-mini"
},
"options": {},
"builtInTools": {}
},
"id": "openai-1",
"name": "OpenAI Model",
"type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
"position": [352, 512],
"typeVersion": 1.3,
"credentials": {
"openAiApi": {
"id": "P1rSZeqDWn05zkK1",
"name": "aitunnel"
}
}
},
{
"parameters": {
"language": "javascript",
"jsCode": "const ai = $json.output || $node['AI Agent'].json.output || '';\nconst buildContext = $node['Build Context Code'].json;\n\nconst roomId = buildContext.room_id || '';\nconst isGroup = buildContext.is_group || false;\nconst sender = buildContext.sender || '';\nconst senderName = buildContext.sender_name || '';\nconst originalMessage = buildContext.originalMessage || '';\n\nconst esc = (s) => {\n if (!s) return 'NULL';\n return \"'\" + String(s).replace(/'/g, \"''\") + \"'\";\n};\n\nconst query = \"INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (\" +\n esc(roomId) + \", \" +\n esc(sender) + \", \" +\n esc(senderName) + \", \" +\n esc(originalMessage) + \", \" +\n esc(ai) + \")\";\n\nreturn {\n message: ai,\n room_id: roomId,\n sender_id: isGroup ? sender : null,\n sender_name: isGroup ? senderName : null,\n query: query\n};"
},
"id": "save-and-send",
"name": "Save and Send",
"type": "n8n-nodes-base.code",
"position": [560, 304],
"typeVersion": 1
},
{
"parameters": {
"method": "POST",
"url": "http://matrix_bot:5000/send_message",
"sendBody": true,
"specifyBody": "json",
"jsonBody": "={{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }}",
"options": {}
},
"id": "http-1",
"name": "Send to Matrix",
"type": "n8n-nodes-base.httpRequest",
"position": [752, 208],
"typeVersion": 4.2
},
{
"parameters": {
"operation": "executeQuery",
"query": "={{ $json.query }}",
"additionalFields": {}
},
"id": "save-to-db",
"name": "Save to Database",
"type": "n8n-nodes-base.postgres",
"position": [752, 400],
"typeVersion": 1,
"credentials": {
"postgres": {
"id": "Sy23a0Zvp6PlFt6V",
"name": "Postgres matrix_bot_memory"
}
}
},
{
"parameters": {
"respondWith": "json",
"responseBody": "={{ JSON.stringify({ success: true }) }}",
"options": {}
},
"id": "respond-1",
"name": "Respond to Webhook",
"type": "n8n-nodes-base.respondToWebhook",
"position": [960, 208],
"typeVersion": 1.1
}
],
"connections": {
"Webhook": {
"main": [
[
{
"node": "Format Input",
"type": "main",
"index": 0
}
]
]
},
"AI Agent": {
"main": [
[
{
"node": "Save and Send",
"type": "main",
"index": 0
}
]
]
},
"Format Input": {
"main": [
[
{
"node": "Query Memory",
"type": "main",
"index": 0
},
{
"node": "Build Context Code",
"type": "main",
"index": 1
}
]
]
},
"OpenAI Model": {
"ai_languageModel": [
[
{
"node": "AI Agent",
"type": "ai_languageModel",
"index": 0
}
]
]
},
"Query Memory": {
"main": [
[
{
"node": "Build Context Code",
"type": "main",
"index": 0
}
]
]
},
"Save and Send": {
"main": [
[
{
"node": "Send to Matrix",
"type": "main",
"index": 0
},
{
"node": "Save to Database",
"type": "main",
"index": 0
}
]
]
},
"Send to Matrix": {
"main": [
[
{
"node": "Respond to Webhook",
"type": "main",
"index": 0
}
]
]
},
"Build Context Code": {
"main": [
[
{
"node": "AI Agent",
"type": "main",
"index": 0
}
]
]
}
},
"settings": {
"executionOrder": "v1",
"saveDataErrorExecution": "all",
"saveDataSuccessExecution": "all",
"saveManualExecutions": true,
"saveExecutionProgress": true
}
}
Процесс установки
1. Подготовка базы данных
1. Откройте PgAdmin 2. Выполните SQL команды из раздела «Подготовка базы данных» 3. Проверьте что таблица создана
2. Импорт workflow в n8n
1. Скопируйте JSON из раздела «JSON для импорта в n8n» 2. В n8n нажмите «+» > «Import from URL or paste JSON» 3. Вставьте JSON и нажмите «Import» 4. Откройте импортированный workflow
3. Настройка credentials
1. Откройте workflow 2. Найдите node «Query Memory» и установите credentials PostgreSQL 3. Найдите node «OpenAI Model» и установите credentials OpenAI API 4. Найдите node «Save to Database» и установите credentials PostgreSQL
4. Проверка и активация
1. Нажмите «Test» чтобы проверить webhook URL 2. Скопируйте webhook URL 3. Настройте Matrix сервер на отправку вебхуков на этот URL 4. Активируйте workflow (кнопка «Activate»)
Примеры использования
Пример 1: Простой расчет
Пользователь: Сколько будет 7 + 3? Бот: 7 + 3 = 10 ✨
Пользователь: А сколько было в предыдущем вычислении? Бот: В предыдущем вычислении было: 7 + 3 = 10 📝
Пример 2: Разговор с контекстом
Пользователь: Какой сегодня день? Бот: Сегодня День недели, число месяца 📅
Пользователь: А вчера какой был? Бот: Вчера был [предыдущий день] 📆
Решение проблем
Бот не отвечает
1. Проверьте что workflow активен 2. Проверьте credentials PostgreSQL и OpenAI 3. Проверьте что Matrix сервер отправляет вебхуки на правильный URL 4. Посмотрите Executions в n8n - там будут ошибки
Сообщение: column "sender_name" does not exist
Выполните SQL команду:
ALTER TABLE conversation_history ADD COLUMN sender_name VARCHAR(255);
Бот забывает историю
1. Проверьте что SQL query в «Query Memory» выглядит правильно 2. Проверьте что сообщения сохраняются в БД (Look in Save to Database node output) 3. Проверьте что room_id совпадает в разных сообщениях
OpenAI API error
1. Проверьте что API key валидный 2. Проверьте что у вас есть баланс в OpenAI 3. Проверьте что используется правильная модель (gpt-4o-mini)
Matrix Bot сервис (Python)
Описание
Matrix Bot - это Python сервис, который подключается к Matrix серверу и:
- Получает новые сообщения из чатов
- Отправляет их в n8n webhook для обработки AI
- Получает ответы через HTTP API и отправляет в Matrix
- Поддерживает шифрованные комнаты (Megolm)
- Приветствует новых пользователей в комнате «Общая»
Требования
- Python 3.9+
- python-nio (Matrix client library)
- aiohttp (async HTTP client)
- python-dotenv (для конфигурации)
Установка
pip install python-nio aiohttp python-dotenv
Конфигурация (.env файл)
MATRIX_HOMESERVER=https://syna.digitizepro.tech MATRIX_USER=@bot:syna.digitizepro.tech MATRIX_PASSWORD=your_password_here PICKLE_KEY=your_encryption_key_here N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai N8N_TIMEOUT=30 HTTP_API_PORT=5000
Полный код бота
Файл: matrix_bot.py
import asyncio import os import logging from logging.handlers import RotatingFileHandler import json import aiohttp from pathlib import Path from aiohttp import web from nio import ( AsyncClient, AsyncClientConfig, RoomMessageText, MegolmEvent, InviteMemberEvent, RoomMemberEvent, LoginResponse, KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac, LocalProtocolError, ) # ===== Настройка логирования ===== os.makedirs('/app/logs', exist_ok=True) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler = RotatingFileHandler( '/app/logs/matrix_bot.log', maxBytes=50*1024*1024, backupCount=10, encoding='utf-8' ) file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.setLevel(logging.INFO) root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.addHandler(file_handler) root_logger.addHandler(console_handler) logger = logging.getLogger("matrix-bot") # ===== Конфигурация ===== HOMESERVER = os.getenv("MATRIX_HOMESERVER", "https://syna.digitizepro.tech") USER = os.getenv("MATRIX_USER", "@bot:syna.digitizepro.tech") PASSWORD = os.getenv("MATRIX_PASSWORD") STORE_PATH = "/app/store" PICKLE_KEY = os.getenv("PICKLE_KEY", "default_insecure_key_change_me") BOT_NICKNAME = "ChatBot" CREDENTIALS_FILE = os.path.join(STORE_PATH, "credentials.json") # n8n интеграция N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL", "http://n8n:5678/webhook/matrix-bot-ai") N8N_TIMEOUT = int(os.getenv("N8N_TIMEOUT", "30")) # HTTP API для ответов от n8n HTTP_API_PORT = int(os.getenv("HTTP_API_PORT", "5000")) # Настройка клиента с шифрованием config = AsyncClientConfig( encryption_enabled=True, pickle_key=PICKLE_KEY, store_sync_tokens=True ) # Глобальные переменные client = None app = None http_runner = None sync_error_count = 0 # ===== НОВОЕ: Функция очистки sync токенов ===== def clear_sync_tokens(): """Очищает sync токены для полной пересинхронизации после перезагрузки""" try: store_path = Path(STORE_PATH) # Список файлов токенов которые нужно удалить token_files = [ store_path / "sync_token.json", store_path / ".matrix-nio-sync-token" ] for token_file in token_files: if token_file.exists(): token_file.unlink() logger.info(f"🗑️ Удалён sync token файл: {token_file}") # Удаляем базу комнат для полной пересинхронизации db_file = store_path / "rooms.json" if db_file.exists(): db_file.unlink() logger.info(f"🗑️ Удалена база комнат для пересинхронизации") logger.info("✅ Sync токены очищены, будет выполнена полная пересинхронизация") except Exception as e: logger.warning(f"⚠️ Ошибка очистки sync tokens: {e}") # ===== HTTP API обработчики ===== async def send_message_handler(request): """HTTP endpoint для отправки сообщения в Matrix от n8n""" try: data = await request.json() room_id = data.get("room_id") message = data.get("message") reply_to_event_id = data.get("reply_to_event_id") sender_id = data.get("sender_id") sender_name = data.get("sender_name") if not room_id or not message: return web.json_response( {"error": "room_id and message required"}, status=400 ) logger.info(f"📤 Получен запрос отправить сообщение в комнату {room_id}") logger.info(f"💬 Текст: {message[:100]}") if reply_to_event_id: logger.info(f"↩️ Reply to event: {reply_to_event_id}") if sender_id: logger.info(f"👤 Mention user: {sender_name} ({sender_id})") # Формируем content сообщения content = { "msgtype": "m.text", "body": message } # Если есть sender_id, добавляем HTML mention if sender_id and sender_name: content["format"] = "org.matrix.custom.html" content["formatted_body"] = f'<a href="https://matrix.to/#/{sender_id}">{sender_name}</a>: {message}' # Добавляем reply-to если указан event_id if reply_to_event_id: content["m.relates_to"] = { "m.in_reply_to": { "event_id": reply_to_event_id } } if "formatted_body" not in content: content["format"] = "org.matrix.custom.html" content["formatted_body"] = message content["formatted_body"] = f'<mx-reply><blockquote><a href="https://matrix.to/#/{room_id}/{reply_to_event_id}">In reply to</a></blockquote></mx-reply>{content["formatted_body"]}' # Отправляем сообщение в Matrix try: await client.room_send( room_id=room_id, message_type="m.room.message", content=content ) logger.info(f"✅ Сообщение отправлено в {room_id}") return web.json_response({"status": "ok", "room_id": room_id}) except LocalProtocolError as e: logger.error(f"❌ Ошибка отправки (protocol): {e}") return web.json_response( {"error": f"Protocol error: {str(e)}"}, status=500 ) except Exception as e: logger.error(f"❌ Ошибка отправки: {e}") return web.json_response( {"error": f"Send error: {str(e)}"}, status=500 ) except json.JSONDecodeError: return web.json_response( {"error": "Invalid JSON"}, status=400 ) except Exception as e: logger.error(f"❌ Ошибка в handler: {e}") return web.json_response( {"error": str(e)}, status=500 ) async def health_handler(request): """Health check endpoint""" is_connected = client is not None and client.logged_in if client else False return web.json_response({ "status": "ok", "connected": is_connected, "timestamp": asyncio.get_event_loop().time() }) # ===== Функции ===== def save_credentials(device_id, access_token): """Сохраняет учётные данные в файл""" try: Path(STORE_PATH).mkdir(parents=True, exist_ok=True) with open(CREDENTIALS_FILE, "w") as f: json.dump({ "device_id": device_id, "access_token": access_token }, f) logger.info(f"✅ Credentials сохранены в {CREDENTIALS_FILE}") except Exception as e: logger.error(f"❌ Ошибка сохранения credentials: {e}") def load_credentials(): """Загружает учётные данные из файла""" try: if os.path.exists(CREDENTIALS_FILE): with open(CREDENTIALS_FILE, "r") as f: creds = json.load(f) logger.info(f"✅ Credentials загружены: device_id={creds.get('device_id')}") return creds.get("device_id"), creds.get("access_token") except Exception as e: logger.warning(f"Не удалось загрузить credentials: {e}") return None, None async def send_to_n8n(text, room_id, sender, is_dm, sender_name=None, event_id=None): """Отправляет сообщение в n8n webhook для обработки AI""" payload = { "text": text, "room_id": room_id, "sender": sender, "sender_name": sender_name or sender, "is_dm": is_dm, "event_id": event_id } try: async with aiohttp.ClientSession() as session: async with session.post( N8N_WEBHOOK_URL, json=payload, timeout=aiohttp.ClientTimeout(total=N8N_TIMEOUT) ) as resp: if resp.status == 200: result = await resp.json() logger.info(f"✅ n8n обработал сообщение") return result else: text_resp = await resp.text() logger.error(f"❌ n8n вернул ошибку {resp.status}: {text_resp[:200]}") return None except asyncio.TimeoutError: logger.error(f"⏱️ Timeout при обращении к n8n (>{N8N_TIMEOUT}s)") return None except Exception as e: logger.error(f"❌ Ошибка при отправке в n8n: {e}") return None async def message_callback(room, event): """Обработка текстовых сообщений""" logger.info(f"📨 Event type: {type(event).__name__}, sender: {event.sender}, room: {room.room_id}") # Расшифровка Megolm-сообщений if isinstance(event, MegolmEvent): logger.info(f"🔐 Decrypting megolm event...") try: event = await client.decrypt_event(event) if not event: logger.warning(f"Failed to decrypt event in {room.room_id}") return logger.info(f"✅ Decrypted successfully") except Exception as e: logger.error(f"Decrypt error: {e}") return if not isinstance(event, RoomMessageText): logger.debug(f"Ignoring non-text event: {type(event).__name__}") return if event.sender == client.user_id: logger.debug(f"Ignoring own message") return text = (event.body or "").strip() if not text: return logger.info(f"💬 Message text: {text[:100]}") logger.info(f"🆔 Event ID: {event.event_id}") # ===== ИСПРАВЛЕНО: Правильная обработка None в member_count ===== member_count = getattr(room, 'member_count', None) # Если member_count None, пробуем другие способы if member_count is None or member_count == 0: member_count = len(room.users) if hasattr(room, 'users') and room.users else 0 if member_count == 0 and hasattr(room, 'joined_count'): member_count = getattr(room, 'joined_count', 0) if member_count == 0 and hasattr(room, 'summary'): member_count = getattr(room.summary, 'joined_member_count', 0) # ===== КРИТИЧНО: Проверка что member_count не None перед сравнением ===== # Если всё ещё None (после перезагрузки/бекапа), используем default значение if member_count is None: logger.warning(f"⚠️ member_count is None (после перезагрузки?), используем default=2 (DM)") member_count = 2 logger.info(f"🔍 Room info: members={member_count}, name={room.display_name or room.name or 'None'}") # Определяем тип комнаты if member_count == 2: is_dm = True logger.info(f"🔍 Room type: DM (2 members)") elif member_count <= 1: # ← Теперь member_count никогда не будет None! is_dm = True logger.info(f"🔍 Room type: DM (fallback, members={member_count})") else: is_dm = False logger.info(f"🔍 Room type: GROUP ({member_count} members)") # Проверяем нужно ли отвечать if is_dm: cleaned = text logger.info(f"💬 DM message, sending to n8n") else: # В группе только на обращение if not text.lower().startswith(f"{BOT_NICKNAME.lower()}:"): logger.info(f"⏭️ Ignoring group message without mention") return cleaned = text[len(f"{BOT_NICKNAME}:"):].strip() if not cleaned: return logger.info(f"💬 Group mention detected, sending to n8n") # Отправляем сообщение в n8n для обработки AI logger.info(f"🚀 Sending to n8n webhook...") # Получаем имя отправителя из комнаты sender_name = None if event.sender in room.users: user = room.users[event.sender] sender_name = user.name or event.sender result = await send_to_n8n(cleaned, room.room_id, event.sender, is_dm, sender_name, event.event_id) if not result: fallback_reply = "Извините, возникла ошибка при обработке вашего сообщения. Попробуйте позже." try: await client.room_send( room_id=room.room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": fallback_reply} ) logger.warning(f"📤 Отправлен fallback ответ") except Exception as e: logger.error(f"❌ Ошибка отправки fallback: {e}") async def invite_callback(room, event): """Обработка приглашений в комнаты""" try: await client.join(room.room_id) logger.info(f"✅ Присоединился к комнате: {room.room_id}") except Exception as e: logger.exception(f"❌ Ошибка join: {e}") async def member_callback(room, event): """Обработка событий присоединения/выхода пользователей""" try: if event.membership != "join": return if event.state_key == client.user_id: logger.debug(f"Ignoring own membership event") return room_name = room.display_name or room.name or "" logger.info(f"👥 Member event in room: {room_name} ({room.room_id})") if room_name.lower() != "общая": logger.debug(f"Skipping welcome message - not 'Общая' room") return new_user_id = event.state_key new_user_name = new_user_id if new_user_id in room.users: user = room.users[new_user_id] new_user_name = user.name or new_user_id logger.info(f"👋 Sending welcome message to {new_user_name} in {room_name}") plain_message = f"{new_user_name}: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя." html_message = f'<a href="https://matrix.to/#/{new_user_id}">{new_user_name}</a>: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя.' await client.room_send( room_id=room.room_id, message_type="m.room.message", content={ "msgtype": "m.text", "body": plain_message, "format": "org.matrix.custom.html", "formatted_body": html_message } ) logger.info(f"✅ Welcome message sent to {new_user_name}") except Exception as e: logger.error(f"❌ Ошибка в member_callback: {e}") logger.exception(e) async def to_device_callback(event): """Обработка to-device сообщений для обмена ключами""" logger.debug(f"To-device event: {type(event).__name__}") # ===== HTTP сервер ===== async def start_http_server(): """Запускает HTTP API сервер для получения ответов от n8n""" global app, http_runner app = web.Application() app.router.add_post('/send_message', send_message_handler) app.router.add_get('/health', health_handler) http_runner = web.AppRunner(app) await http_runner.setup() site = web.TCPSite(http_runner, '0.0.0.0', HTTP_API_PORT) await site.start() logger.info(f"🌐 HTTP API запущен на порту {HTTP_API_PORT}") logger.info(f"📡 Endpoints: /send_message (POST), /health (GET)") # ===== Основной цикл ===== async def main(): global client, sync_error_count logger.info(f"🚀 Запуск Matrix бота") logger.info(f"🏠 Homeserver: {HOMESERVER}") logger.info(f"👤 User: {USER}") logger.info(f"📡 n8n webhook: {N8N_WEBHOOK_URL}") # Загружаем сохранённые credentials device_id, access_token = load_credentials() # Создаём клиента с сохранёнными данными if device_id and access_token: client = AsyncClient( HOMESERVER, USER, config=config, store_path=STORE_PATH, device_id=device_id ) client.access_token = access_token client.user_id = USER logger.info(f"✅ Используем существующую сессию, device_id: {device_id}") else: client = AsyncClient(HOMESERVER, USER, config=config, store_path=STORE_PATH) client.user_id = USER logger.info("Выполняется первичный логин...") if not PASSWORD: logger.error("❌ MATRIX_PASSWORD не установлен в .env") return login_resp = await client.login(PASSWORD, device_name="matrix-bot") if not isinstance(login_resp, LoginResponse): logger.error(f"❌ Login failed: {login_resp}") return logger.info(f"✅ Залогинились как {USER}, device_id: {client.device_id}") save_credentials(client.device_id, client.access_token) # Загрузка crypto store try: client.load_store() logger.info("✅ Crypto store загружен") except Exception as e: logger.warning(f"Crypto store не загружен: {e}") # ===== НОВОЕ: Первичная синхронизация с обработкой next_batch ошибок ===== logger.info("⏳ Выполняется первичная синхронизация...") max_sync_retries = 3 for sync_attempt in range(max_sync_retries): try: sync_response = await client.sync(timeout=30000, full_state=True) logger.info(f"✅ Первичная синхронизация завершена") sync_error_count = 0 break except Exception as e: error_str = str(e) sync_attempt_num = sync_attempt + 1 # Проверяем на ошибку next_batch if "next_batch" in error_str or "'next_batch' is a required property" in error_str: logger.error(f"❌ Sync error (попытка {sync_attempt_num}/{max_sync_retries}): next_batch missing") logger.warning(f"🔄 Это происходит после перезагрузки - Matrix сервер вернул некорректный ответ") if sync_attempt_num < max_sync_retries: logger.info(f"🗑️ Очищаю sync токены и пробую ещё раз...") clear_sync_tokens() await asyncio.sleep(5) # Дождёмся перед следующей попыткой else: logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток") logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...") break else: logger.error(f"❌ Ошибка синхронизации (попытка {sync_attempt_num}/{max_sync_retries}): {e}") if sync_attempt_num < max_sync_retries: await asyncio.sleep(5) else: logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток") logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...") break # Загрузка ключей шифрования при необходимости if client.should_upload_keys: logger.info("📤 Загрузка ключей шифрования...") try: await client.keys_upload() logger.info("✅ Ключи шифрования загружены") except Exception as e: logger.warning(f"⚠️ Ошибка загрузки ключей: {e}") # Запрос ключей для шифрованных комнат logger.info("🔑 Запрос ключей для комнат...") try: await client.keys_query() shared_count = 0 for room_id, room in client.rooms.items(): if room.encrypted: try: await client.share_group_session(room_id, ignore_unverified_devices=True) shared_count += 1 except Exception as e: logger.warning(f"⚠️ Share session error for {room_id}: {e}") if shared_count > 0: logger.info(f"🔐 Ключи общей сессии переданы для {shared_count} шифрованных комнат") except Exception as e: if "No key query required" not in str(e): logger.warning(f"⚠️ Keys query error: {e}") # Регистрируем callbacks для новых событий client.add_event_callback(message_callback, RoomMessageText) client.add_event_callback(message_callback, MegolmEvent) client.add_event_callback(invite_callback, InviteMemberEvent) client.add_event_callback(member_callback, RoomMemberEvent) client.add_to_device_callback(to_device_callback, (KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac)) # Запускаем HTTP сервер await start_http_server() logger.info(f"🤖 Бот полностью готов к работе!") logger.info(f"✅ Listening for messages...") try: # ===== НОВОЕ: Основной цикл с обработкой ошибок next_batch ===== consecutive_errors = 0 while True: try: await client.sync(timeout=30000) consecutive_errors = 0 # Reset при успехе except Exception as e: error_str = str(e) consecutive_errors += 1 # Проверяем на ошибку next_batch if "next_batch" in error_str or "'next_batch' is a required property" in error_str: logger.warning(f"⚠️ Sync error #{consecutive_errors}: next_batch missing (обычно после перезагрузки)") if consecutive_errors == 1: logger.info("🗑️ Первая ошибка - очищаю sync токены...") clear_sync_tokens() # Экспоненциальная задержка: 5s, 10s, 20s, 40s... delay = min(5 * (2 ** (consecutive_errors - 1)), 120) logger.info(f"⏳ Жду {delay}s перед повторной попыткой...") await asyncio.sleep(delay) if consecutive_errors > 5: logger.error("❌ Слишком много consecutive ошибок (>5), контейнер перезагружается...") break else: # Другие ошибки logger.error(f"❌ Ошибка sync #{consecutive_errors}: {e}") if consecutive_errors > 3: logger.warning(f"⚠️ {consecutive_errors} ошибок подряд, даю больше времени...") await asyncio.sleep(30) else: await asyncio.sleep(10) except KeyboardInterrupt: logger.info("⏹️ Получен сигнал завершения") finally: if http_runner: await http_runner.cleanup() if client: await client.close() logger.info("👋 Бот остановлен") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Bot остановлен пользователем") except Exception as e: logger.exception(f"❌ Fatal error: {e}")
Запуск бота
# Создаём .env файл с конфигурацией cat > .env << EOF MATRIX_HOMESERVER=https://syna.digitizepro.tech MATRIX_USER=@bot:syna.digitizepro.tech MATRIX_PASSWORD=your_password PICKLE_KEY=your_encryption_key N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai N8N_TIMEOUT=30 HTTP_API_PORT=5000 EOF # Запускаем бот python matrix_bot.py
Использование с Docker
requirements.txt:
python-nio>=0.24.0 aiohttp>=3.8.0 python-dotenv>=0.21.0
Dockerfile:
FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY matrix_bot.py . CMD ["python", "matrix_bot.py"]
Основные функции
Message Callback
Обрабатывает текстовые сообщения:
- Расшифровывает Megolm-сообщения (из шифрованных комнат)
- Определяет тип комнаты (DM или GROUP)
- В DM отвечает на все сообщения
- В GROUP отвечает только на обращение (
ChatBot: вопрос) - Отправляет в n8n webhook
Send to n8n
Отправляет сообщение в n8n:
- text - очищенный текст
- room_id - ID комнаты
- sender - Matrix ID отправителя
- sender_name - Имя отправителя
- is_dm - флаг типа комнаты
- event_id - ID события
Send Message Handler
HTTP endpoint для ответов от n8n:
- Получает сообщение и параметры
- Добавляет HTML mention если есть sender_id
- Отправляет в Matrix комнату
Технические детали
Порядок сообщений в контексте
Query Memory возвращает сообщения в порядке DESC (новые сверху), но Build Context Code переворачивает их так чтобы новые сообщения были в конце контекста. Это важно чтобы AI правильно понял что является «последним» сообщением.
Экранирование SQL
В Save and Send используется функция esc() которая экранирует одинарные кавычки ( → '') чтобы избежать SQL инъекций.
Fallback значения
Если sender_name не пришел, используется sender ID Если сообщение не пришло, используется пустая строка Все это обрабатывается в Build Context Code и Save and Send
Дополнительная информация
Версия Workflow: 236 Последнее обновление: 2025-11-29 Статус: Production Ready ✅
Contacts: Если возникли вопросы по workflow, смотрите логи в Executions