Показать страницуИстория страницыСсылки сюдаНаверх Эта страница только для чтения. Вы можете посмотреть её исходный текст, но не можете его изменить. Сообщите администратору, если считаете, что это неправильно. ====== Matrix Bot AI - Полная документация ====== ===== Описание ====== Matrix Bot AI - это интеллектуальный чат-бот для платформы Matrix, работающий на n8n. Бот запоминает историю разговоров в PostgreSQL и использует OpenAI GPT-4o-mini для генерации ответов с учетом контекста предыдущих сообщений. **Основные возможности:** * Запоминание истории разговоров в разных комнатах * Использование контекста при ответах * Сохранение ID и имен собеседников * Автоматическая сортировка истории от новых к старым ===== Архитектура ====== ==== Компоненты ==== **1. Matrix сервер** - отправляет вебхук при новом сообщении **2. n8n Workflow** - обрабатывает сообщение и генерирует ответ **3. PostgreSQL** - хранит историю разговоров **4. OpenAI API** - генерирует ответы через GPT-4o-mini **5. Matrix Bot сервис** - отправляет сообщения обратно в чат ===== Логика работы ====== ==== Поток данных ==== 1. **Webhook приходит** → Формирует данные сообщения (текст, room_id, sender, sender_name) 2. **Query Memory** → Получает последние 10 сообщений из истории этой комнаты 3. **Build Context** → Собирает контекст разговора для AI с сохранением порядка (новые сообщения в конце) 4. **AI Agent** → OpenAI генерирует ответ на основе контекста 5. **Save and Send** → Сохраняет результат в БД и отправляет сообщение в чат 6. **Response** → Возвращает успешный ответ на вебхук ==== Структура данных ==== **Таблица conversation_history:** * **room_id** - VARCHAR - ID комнаты Matrix * **sender** - VARCHAR - Matrix ID отправителя (@admin:domain) * **sender_name** - VARCHAR - Отображаемое имя в чате (Николай) * **message** - TEXT - Текст сообщения * **bot_response** - TEXT - Ответ бота * **created_at** - TIMESTAMP - Время создания записи ===== Подготовка базы данных ====== Выполните эти SQL команды в PgAdmin: <code sql> -- Создание таблицы conversation_history CREATE TABLE conversation_history ( id SERIAL PRIMARY KEY, room_id VARCHAR(255) NOT NULL, sender VARCHAR(255) NOT NULL, sender_name VARCHAR(255), message TEXT, bot_response TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Создание индекса для быстрого поиска по room_id CREATE INDEX idx_room_id ON conversation_history(room_id); CREATE INDEX idx_created_at ON conversation_history(created_at DESC); -- Добавление колонки sender_name если таблица уже существует ALTER TABLE conversation_history ADD COLUMN sender_name VARCHAR(255); </code> ===== Nodes в Workflow ====== ==== 1. Webhook ==== **Тип:** n8n-nodes-base.webhook **Назначение:** Получает HTTP POST запрос от Matrix сервера **Параметры:** * Path: matrix-bot-ai * HTTP Method: POST * Response Mode: responseNode **Входящие данные:** <code json> { "body": { "text": "сообщение", "room_id": "!abc123:domain", "sender": "@user:domain", "sender_name": "User Name", "is_dm": false } } </code> ==== 2. Format Input ==== **Тип:** n8n-nodes-base.code (JavaScript) **Назначение:** Форматирует входящие данные в стандартный вид **Код:** <code javascript> const body = $json.body; return { text: body.message || body.text, room_id: body.room_id, sender: body.sender || '', sender_name: body.sender_name || 'User', originalMessage: body.message || body.text, is_group: !body.is_dm }; </code> **Выходные данные:** <code json> { "text": "сообщение", "room_id": "!abc123:domain", "sender": "@user:domain", "sender_name": "User Name", "originalMessage": "сообщение", "is_group": false } </code> ==== 3. Query Memory ==== **Тип:** n8n-nodes-base.postgres **Назначение:** Получает историю из PostgreSQL **Операция:** executeQuery **SQL Query:** <code sql> SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '{{room_id}}' ORDER BY created_at DESC LIMIT 10 </code> **Параметр Query (в n8n):** <code> SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '{{ "'" + $json.room_id.replace(/'/g, "''") + "'" }}' ORDER BY created_at DESC LIMIT 10 </code> **Выходные данные:** Массив последних 10 сообщений (или пусто если их нет) ==== 4. Build Context Code ==== **Тип:** n8n-nodes-base.code (JavaScript) **Назначение:** Собирает контекст разговора для AI из памяти **Код:** <code javascript> // Get Format Input from $node const formatInput = $node['Format Input'].json; // Get all items from Query Memory const allItems = $input.all(); const memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || []; let contextText = ''; if (memoryItems.length > 0) { contextText = 'Recent conversation history:\n\n'; // NEWEST FIRST - iterate from 0 to end (last item is most recent) for (let i = 0; i < memoryItems.length; i++) { const record = memoryItems[i].json || {}; if (record.message && record.sender && record.bot_response && record.created_at) { const ts = new Date(record.created_at).toLocaleString('ru-RU'); const senderName = record.sender_name || record.sender; contextText += `[${ts}] ${senderName}: ${record.message}\n`; contextText += ` → Jarvice: ${record.bot_response}\n\n`; } } } // IMPORTANT: Return ALL fields from Format Input return { context: contextText, text: formatInput.text, room_id: formatInput.room_id, sender: formatInput.sender, sender_name: formatInput.sender_name, originalMessage: formatInput.originalMessage, is_group: formatInput.is_group }; </code> **Выходные данные:** <code json> { "context": "Recent conversation history:\n\n[время] Николай: 3+2\n → Jarvice: 5\n\n", "text": "сообщение", "room_id": "!abc123:domain", "sender": "@user:domain", "sender_name": "User Name", "originalMessage": "сообщение", "is_group": false } </code> ==== 5. AI Agent ==== **Тип:** @n8n/n8n-nodes-langchain.agent **Назначение:** LangChain Agent использующий OpenAI модель **Параметры:** * Prompt Type: define * Text: ''{{ $json.context + 'User (' + $json.sender_name + '): ' + $json.text }}'' * System Message: (см. ниже - НЕ МЕНЯТЬ!) **System Message:** <code> Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖. Ты помощник Администратора (Николай). Отвечай кратко, используй emoji. Обязательно используй контекст предыдущих сообщений для лучшего ответа. ПРАВИЛА ЧАТА (сообщай их ТОЛЬКО если тебя о них спросили есть они или нет или просят сказать какие правила в этом чате, перечисляй только эти правила и не добавляй другие от себя). 1. Сообщения в чате сохраняются одну неделю. 2. Все сообщения старше недели удаляются. 3. В чате есть AI ассистент - может помочь с вопросами. 4. Соблюдай законодательство РФ. </code> ==== 6. OpenAI Model ==== **Тип:** @n8n/n8n-nodes-langchain.lmChatOpenAi **Назначение:** Подключение к OpenAI API **Параметры:** * Model: gpt-4o-mini * Credentials: aitunnel (ваш OpenAI API key) ==== 7. Save and Send ==== **Тип:** n8n-nodes-base.code (JavaScript) **Назначение:** Подготавливает данные для сохранения и отправки **Код:** <code javascript> const ai = $json.output || $node['AI Agent'].json.output || ''; const buildContext = $node['Build Context Code'].json; const roomId = buildContext.room_id || ''; const isGroup = buildContext.is_group || false; const sender = buildContext.sender || ''; const senderName = buildContext.sender_name || ''; const originalMessage = buildContext.originalMessage || ''; const esc = (s) => { if (!s) return 'NULL'; return "'" + String(s).replace(/'/g, "''") + "'"; }; const query = "INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (" + esc(roomId) + ", " + esc(sender) + ", " + esc(senderName) + ", " + esc(originalMessage) + ", " + esc(ai) + ")"; return { message: ai, room_id: roomId, sender_id: isGroup ? sender : null, sender_name: isGroup ? senderName : null, query: query }; </code> **Выходные данные:** <code json> { "message": "Ответ бота", "room_id": "!abc123:domain", "sender_id": "@user:domain", "sender_name": "User Name", "query": "INSERT INTO conversation_history..." } </code> ==== 8. Send to Matrix ==== **Тип:** n8n-nodes-base.httpRequest **Назначение:** Отправляет ответ в Matrix через Matrix Bot сервис **Параметры:** * URL: http://matrix_bot:5000/send_message * Method: POST * Send Body: true * Specify Body: json **JSON Body:** <code> {{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }} </code> ==== 9. Save to Database ==== **Тип:** n8n-nodes-base.postgres **Назначение:** Сохраняет разговор в PostgreSQL **Параметры:** * Operation: executeQuery * Query: ''{{ $json.query }}'' ==== 10. Respond to Webhook ==== **Тип:** n8n-nodes-base.respondToWebhook **Назначение:** Отправляет успешный ответ на вебхук **Параметры:** * Respond With: json * Response Body: ''{{ JSON.stringify({ success: true }) }}'' ===== Connections (Связи между nodes) ====== **Webhook** → Format Input **Format Input** → Query Memory (index 0) **Format Input** → Build Context Code (index 1) **Query Memory** → Build Context Code (index 0) **Build Context Code** → AI Agent **OpenAI Model** → AI Agent (ai_languageModel) **AI Agent** → Save and Send **Save and Send** → Send to Matrix **Save and Send** → Save to Database **Send to Matrix** → Respond to Webhook ===== JSON для импорта в n8n ====== Используйте этот JSON для создания workflow в n8n: <code json> { "name": "Matrix Bot AI", "nodes": [ { "parameters": { "httpMethod": "POST", "path": "matrix-bot-ai", "responseMode": "responseNode", "options": {} }, "id": "webhook-1", "name": "Webhook", "type": "n8n-nodes-base.webhook", "position": [-400, 304], "webhookId": "2dad965f-08e2-412f-960a-b16c91dea686", "typeVersion": 2 }, { "parameters": { "language": "javascript", "jsCode": "const body = $json.body;\nreturn {\n text: body.message || body.text,\n room_id: body.room_id,\n sender: body.sender || '',\n sender_name: body.sender_name || 'User',\n originalMessage: body.message || body.text,\n is_group: !body.is_dm\n};" }, "id": "format-input", "name": "Format Input", "type": "n8n-nodes-base.code", "position": [-144, 304], "typeVersion": 1 }, { "parameters": { "operation": "executeQuery", "query": "={{ \"SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '\" + $json.room_id.replace(/'/g, \"''\") + \"' ORDER BY created_at DESC LIMIT 10\" }}", "additionalFields": {} }, "id": "query-memory", "name": "Query Memory", "type": "n8n-nodes-base.postgres", "position": [0, 464], "typeVersion": 1, "credentials": { "postgres": { "id": "Sy23a0Zvp6PlFt6V", "name": "Postgres matrix_bot_memory" } } }, { "parameters": { "language": "javascript", "jsCode": "// Get Format Input from $node\nconst formatInput = $node['Format Input'].json;\n\n// Get all items from Query Memory\nconst allItems = $input.all();\nconst memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || [];\n\nlet contextText = '';\n\nif (memoryItems.length > 0) {\n contextText = 'Recent conversation history:\\n\\n';\n // NEWEST FIRST - iterate from 0 to end (last item is most recent)\n for (let i = 0; i < memoryItems.length; i++) {\n const record = memoryItems[i].json || {};\n if (record.message && record.sender && record.bot_response && record.created_at) {\n const ts = new Date(record.created_at).toLocaleString('ru-RU');\n const senderName = record.sender_name || record.sender;\n contextText += `[${ts}] ${senderName}: ${record.message}\\n`;\n contextText += ` → Jarvice: ${record.bot_response}\\n\\n`;\n }\n }\n}\n\n// IMPORTANT: Return ALL fields from Format Input\nreturn {\n context: contextText,\n text: formatInput.text,\n room_id: formatInput.room_id,\n sender: formatInput.sender,\n sender_name: formatInput.sender_name,\n originalMessage: formatInput.originalMessage,\n is_group: formatInput.is_group\n};" }, "id": "build-context-code", "name": "Build Context Code", "type": "n8n-nodes-base.code", "position": [160, 304], "typeVersion": 1 }, { "parameters": { "promptType": "define", "text": "={{ $json.context + 'User (' + $json.sender_name + '): ' + $json.text }}", "options": { "systemMessage": "Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖.\nТы помощник Администратора (Николай).\nОтвечай кратко, используй emoji.\nОбязательно используй контекст предыдущих сообщений для лучшего ответа. Отвечай с числовыми результатами и референциями на предыдущие вычисления." } }, "id": "agent-1", "name": "AI Agent", "type": "@n8n/n8n-nodes-langchain.agent", "position": [304, 304], "typeVersion": 1.9 }, { "parameters": { "model": { "mode": "list", "value": "gpt-4o-mini" }, "options": {}, "builtInTools": {} }, "id": "openai-1", "name": "OpenAI Model", "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi", "position": [352, 512], "typeVersion": 1.3, "credentials": { "openAiApi": { "id": "P1rSZeqDWn05zkK1", "name": "aitunnel" } } }, { "parameters": { "language": "javascript", "jsCode": "const ai = $json.output || $node['AI Agent'].json.output || '';\nconst buildContext = $node['Build Context Code'].json;\n\nconst roomId = buildContext.room_id || '';\nconst isGroup = buildContext.is_group || false;\nconst sender = buildContext.sender || '';\nconst senderName = buildContext.sender_name || '';\nconst originalMessage = buildContext.originalMessage || '';\n\nconst esc = (s) => {\n if (!s) return 'NULL';\n return \"'\" + String(s).replace(/'/g, \"''\") + \"'\";\n};\n\nconst query = \"INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (\" +\n esc(roomId) + \", \" +\n esc(sender) + \", \" +\n esc(senderName) + \", \" +\n esc(originalMessage) + \", \" +\n esc(ai) + \")\";\n\nreturn {\n message: ai,\n room_id: roomId,\n sender_id: isGroup ? sender : null,\n sender_name: isGroup ? senderName : null,\n query: query\n};" }, "id": "save-and-send", "name": "Save and Send", "type": "n8n-nodes-base.code", "position": [560, 304], "typeVersion": 1 }, { "parameters": { "method": "POST", "url": "http://matrix_bot:5000/send_message", "sendBody": true, "specifyBody": "json", "jsonBody": "={{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }}", "options": {} }, "id": "http-1", "name": "Send to Matrix", "type": "n8n-nodes-base.httpRequest", "position": [752, 208], "typeVersion": 4.2 }, { "parameters": { "operation": "executeQuery", "query": "={{ $json.query }}", "additionalFields": {} }, "id": "save-to-db", "name": "Save to Database", "type": "n8n-nodes-base.postgres", "position": [752, 400], "typeVersion": 1, "credentials": { "postgres": { "id": "Sy23a0Zvp6PlFt6V", "name": "Postgres matrix_bot_memory" } } }, { "parameters": { "respondWith": "json", "responseBody": "={{ JSON.stringify({ success: true }) }}", "options": {} }, "id": "respond-1", "name": "Respond to Webhook", "type": "n8n-nodes-base.respondToWebhook", "position": [960, 208], "typeVersion": 1.1 } ], "connections": { "Webhook": { "main": [ [ { "node": "Format Input", "type": "main", "index": 0 } ] ] }, "AI Agent": { "main": [ [ { "node": "Save and Send", "type": "main", "index": 0 } ] ] }, "Format Input": { "main": [ [ { "node": "Query Memory", "type": "main", "index": 0 }, { "node": "Build Context Code", "type": "main", "index": 1 } ] ] }, "OpenAI Model": { "ai_languageModel": [ [ { "node": "AI Agent", "type": "ai_languageModel", "index": 0 } ] ] }, "Query Memory": { "main": [ [ { "node": "Build Context Code", "type": "main", "index": 0 } ] ] }, "Save and Send": { "main": [ [ { "node": "Send to Matrix", "type": "main", "index": 0 }, { "node": "Save to Database", "type": "main", "index": 0 } ] ] }, "Send to Matrix": { "main": [ [ { "node": "Respond to Webhook", "type": "main", "index": 0 } ] ] }, "Build Context Code": { "main": [ [ { "node": "AI Agent", "type": "main", "index": 0 } ] ] } }, "settings": { "executionOrder": "v1", "saveDataErrorExecution": "all", "saveDataSuccessExecution": "all", "saveManualExecutions": true, "saveExecutionProgress": true } } </code> ===== Процесс установки ====== ==== 1. Подготовка базы данных ==== 1. Откройте PgAdmin 2. Выполните SQL команды из раздела "Подготовка базы данных" 3. Проверьте что таблица создана ==== 2. Импорт workflow в n8n ==== 1. Скопируйте JSON из раздела "JSON для импорта в n8n" 2. В n8n нажмите "+" > "Import from URL or paste JSON" 3. Вставьте JSON и нажмите "Import" 4. Откройте импортированный workflow ==== 3. Настройка credentials ==== 1. Откройте workflow 2. Найдите node "Query Memory" и установите credentials PostgreSQL 3. Найдите node "OpenAI Model" и установите credentials OpenAI API 4. Найдите node "Save to Database" и установите credentials PostgreSQL ==== 4. Проверка и активация ==== 1. Нажмите "Test" чтобы проверить webhook URL 2. Скопируйте webhook URL 3. Настройте Matrix сервер на отправку вебхуков на этот URL 4. Активируйте workflow (кнопка "Activate") ===== Примеры использования ====== ==== Пример 1: Простой расчет ==== **Пользователь:** Сколько будет 7 + 3? **Бот:** 7 + 3 = 10 ✨ **Пользователь:** А сколько было в предыдущем вычислении? **Бот:** В предыдущем вычислении было: 7 + 3 = 10 📝 ==== Пример 2: Разговор с контекстом ==== **Пользователь:** Какой сегодня день? **Бот:** Сегодня День недели, число месяца 📅 **Пользователь:** А вчера какой был? **Бот:** Вчера был [предыдущий день] 📆 ===== Решение проблем ====== ==== Бот не отвечает ==== 1. Проверьте что workflow активен 2. Проверьте credentials PostgreSQL и OpenAI 3. Проверьте что Matrix сервер отправляет вебхуки на правильный URL 4. Посмотрите Executions в n8n - там будут ошибки ==== Сообщение: column "sender_name" does not exist ==== Выполните SQL команду: <code sql> ALTER TABLE conversation_history ADD COLUMN sender_name VARCHAR(255); </code> ==== Бот забывает историю ==== 1. Проверьте что SQL query в "Query Memory" выглядит правильно 2. Проверьте что сообщения сохраняются в БД (Look in Save to Database node output) 3. Проверьте что room_id совпадает в разных сообщениях ==== OpenAI API error ==== 1. Проверьте что API key валидный 2. Проверьте что у вас есть баланс в OpenAI 3. Проверьте что используется правильная модель (gpt-4o-mini) ===== Matrix Bot сервис (Python) ====== ==== Описание ==== Matrix Bot - это Python сервис, который подключается к Matrix серверу и: * Получает новые сообщения из чатов * Отправляет их в n8n webhook для обработки AI * Получает ответы через HTTP API и отправляет в Matrix * Поддерживает шифрованные комнаты (Megolm) * Приветствует новых пользователей в комнате "Общая" ==== Требования ==== * Python 3.9+ * python-nio (Matrix client library) * aiohttp (async HTTP client) * python-dotenv (для конфигурации) ==== Установка ==== <code bash> pip install python-nio aiohttp python-dotenv </code> ==== Конфигурация (.env файл) ==== <code> MATRIX_HOMESERVER=https://syna.digitizepro.tech MATRIX_USER=@bot:syna.digitizepro.tech MATRIX_PASSWORD=your_password_here PICKLE_KEY=your_encryption_key_here N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai N8N_TIMEOUT=30 HTTP_API_PORT=5000 </code> ==== Полный код бота ==== **Файл: matrix_bot.py** <code python> import asyncio import os import logging from logging.handlers import RotatingFileHandler import json import aiohttp from pathlib import Path from aiohttp import web from nio import ( AsyncClient, AsyncClientConfig, RoomMessageText, MegolmEvent, InviteMemberEvent, RoomMemberEvent, LoginResponse, KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac, LocalProtocolError, ) # ===== Настройка логирования ===== os.makedirs('/app/logs', exist_ok=True) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler = RotatingFileHandler( '/app/logs/matrix_bot.log', maxBytes=50*1024*1024, backupCount=10, encoding='utf-8' ) file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.setLevel(logging.INFO) root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.addHandler(file_handler) root_logger.addHandler(console_handler) logger = logging.getLogger("matrix-bot") # ===== Конфигурация ===== HOMESERVER = os.getenv("MATRIX_HOMESERVER", "https://syna.digitizepro.tech") USER = os.getenv("MATRIX_USER", "@bot:syna.digitizepro.tech") PASSWORD = os.getenv("MATRIX_PASSWORD") STORE_PATH = "/app/store" PICKLE_KEY = os.getenv("PICKLE_KEY", "default_insecure_key_change_me") BOT_NICKNAME = "ChatBot" CREDENTIALS_FILE = os.path.join(STORE_PATH, "credentials.json") # n8n интеграция N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL", "http://n8n:5678/webhook/matrix-bot-ai") N8N_TIMEOUT = int(os.getenv("N8N_TIMEOUT", "30")) # HTTP API для ответов от n8n HTTP_API_PORT = int(os.getenv("HTTP_API_PORT", "5000")) # Настройка клиента с шифрованием config = AsyncClientConfig( encryption_enabled=True, pickle_key=PICKLE_KEY, store_sync_tokens=True ) # Глобальные переменные client = None app = None http_runner = None sync_error_count = 0 # ===== НОВОЕ: Функция очистки sync токенов ===== def clear_sync_tokens(): """Очищает sync токены для полной пересинхронизации после перезагрузки""" try: store_path = Path(STORE_PATH) # Список файлов токенов которые нужно удалить token_files = [ store_path / "sync_token.json", store_path / ".matrix-nio-sync-token" ] for token_file in token_files: if token_file.exists(): token_file.unlink() logger.info(f"🗑️ Удалён sync token файл: {token_file}") # Удаляем базу комнат для полной пересинхронизации db_file = store_path / "rooms.json" if db_file.exists(): db_file.unlink() logger.info(f"🗑️ Удалена база комнат для пересинхронизации") logger.info("✅ Sync токены очищены, будет выполнена полная пересинхронизация") except Exception as e: logger.warning(f"⚠️ Ошибка очистки sync tokens: {e}") # ===== HTTP API обработчики ===== async def send_message_handler(request): """HTTP endpoint для отправки сообщения в Matrix от n8n""" try: data = await request.json() room_id = data.get("room_id") message = data.get("message") reply_to_event_id = data.get("reply_to_event_id") sender_id = data.get("sender_id") sender_name = data.get("sender_name") if not room_id or not message: return web.json_response( {"error": "room_id and message required"}, status=400 ) logger.info(f"📤 Получен запрос отправить сообщение в комнату {room_id}") logger.info(f"💬 Текст: {message[:100]}") if reply_to_event_id: logger.info(f"↩️ Reply to event: {reply_to_event_id}") if sender_id: logger.info(f"👤 Mention user: {sender_name} ({sender_id})") # Формируем content сообщения content = { "msgtype": "m.text", "body": message } # Если есть sender_id, добавляем HTML mention if sender_id and sender_name: content["format"] = "org.matrix.custom.html" content["formatted_body"] = f'<a href="https://matrix.to/#/{sender_id}">{sender_name}</a>: {message}' # Добавляем reply-to если указан event_id if reply_to_event_id: content["m.relates_to"] = { "m.in_reply_to": { "event_id": reply_to_event_id } } if "formatted_body" not in content: content["format"] = "org.matrix.custom.html" content["formatted_body"] = message content["formatted_body"] = f'<mx-reply><blockquote><a href="https://matrix.to/#/{room_id}/{reply_to_event_id}">In reply to</a></blockquote></mx-reply>{content["formatted_body"]}' # Отправляем сообщение в Matrix try: await client.room_send( room_id=room_id, message_type="m.room.message", content=content ) logger.info(f"✅ Сообщение отправлено в {room_id}") return web.json_response({"status": "ok", "room_id": room_id}) except LocalProtocolError as e: logger.error(f"❌ Ошибка отправки (protocol): {e}") return web.json_response( {"error": f"Protocol error: {str(e)}"}, status=500 ) except Exception as e: logger.error(f"❌ Ошибка отправки: {e}") return web.json_response( {"error": f"Send error: {str(e)}"}, status=500 ) except json.JSONDecodeError: return web.json_response( {"error": "Invalid JSON"}, status=400 ) except Exception as e: logger.error(f"❌ Ошибка в handler: {e}") return web.json_response( {"error": str(e)}, status=500 ) async def health_handler(request): """Health check endpoint""" is_connected = client is not None and client.logged_in if client else False return web.json_response({ "status": "ok", "connected": is_connected, "timestamp": asyncio.get_event_loop().time() }) # ===== Функции ===== def save_credentials(device_id, access_token): """Сохраняет учётные данные в файл""" try: Path(STORE_PATH).mkdir(parents=True, exist_ok=True) with open(CREDENTIALS_FILE, "w") as f: json.dump({ "device_id": device_id, "access_token": access_token }, f) logger.info(f"✅ Credentials сохранены в {CREDENTIALS_FILE}") except Exception as e: logger.error(f"❌ Ошибка сохранения credentials: {e}") def load_credentials(): """Загружает учётные данные из файла""" try: if os.path.exists(CREDENTIALS_FILE): with open(CREDENTIALS_FILE, "r") as f: creds = json.load(f) logger.info(f"✅ Credentials загружены: device_id={creds.get('device_id')}") return creds.get("device_id"), creds.get("access_token") except Exception as e: logger.warning(f"Не удалось загрузить credentials: {e}") return None, None async def send_to_n8n(text, room_id, sender, is_dm, sender_name=None, event_id=None): """Отправляет сообщение в n8n webhook для обработки AI""" payload = { "text": text, "room_id": room_id, "sender": sender, "sender_name": sender_name or sender, "is_dm": is_dm, "event_id": event_id } try: async with aiohttp.ClientSession() as session: async with session.post( N8N_WEBHOOK_URL, json=payload, timeout=aiohttp.ClientTimeout(total=N8N_TIMEOUT) ) as resp: if resp.status == 200: result = await resp.json() logger.info(f"✅ n8n обработал сообщение") return result else: text_resp = await resp.text() logger.error(f"❌ n8n вернул ошибку {resp.status}: {text_resp[:200]}") return None except asyncio.TimeoutError: logger.error(f"⏱️ Timeout при обращении к n8n (>{N8N_TIMEOUT}s)") return None except Exception as e: logger.error(f"❌ Ошибка при отправке в n8n: {e}") return None async def message_callback(room, event): """Обработка текстовых сообщений""" logger.info(f"📨 Event type: {type(event).__name__}, sender: {event.sender}, room: {room.room_id}") # Расшифровка Megolm-сообщений if isinstance(event, MegolmEvent): logger.info(f"🔐 Decrypting megolm event...") try: event = await client.decrypt_event(event) if not event: logger.warning(f"Failed to decrypt event in {room.room_id}") return logger.info(f"✅ Decrypted successfully") except Exception as e: logger.error(f"Decrypt error: {e}") return if not isinstance(event, RoomMessageText): logger.debug(f"Ignoring non-text event: {type(event).__name__}") return if event.sender == client.user_id: logger.debug(f"Ignoring own message") return text = (event.body or "").strip() if not text: return logger.info(f"💬 Message text: {text[:100]}") logger.info(f"🆔 Event ID: {event.event_id}") # ===== ИСПРАВЛЕНО: Правильная обработка None в member_count ===== member_count = getattr(room, 'member_count', None) # Если member_count None, пробуем другие способы if member_count is None or member_count == 0: member_count = len(room.users) if hasattr(room, 'users') and room.users else 0 if member_count == 0 and hasattr(room, 'joined_count'): member_count = getattr(room, 'joined_count', 0) if member_count == 0 and hasattr(room, 'summary'): member_count = getattr(room.summary, 'joined_member_count', 0) # ===== КРИТИЧНО: Проверка что member_count не None перед сравнением ===== # Если всё ещё None (после перезагрузки/бекапа), используем default значение if member_count is None: logger.warning(f"⚠️ member_count is None (после перезагрузки?), используем default=2 (DM)") member_count = 2 logger.info(f"🔍 Room info: members={member_count}, name={room.display_name or room.name or 'None'}") # Определяем тип комнаты if member_count == 2: is_dm = True logger.info(f"🔍 Room type: DM (2 members)") elif member_count <= 1: # ← Теперь member_count никогда не будет None! is_dm = True logger.info(f"🔍 Room type: DM (fallback, members={member_count})") else: is_dm = False logger.info(f"🔍 Room type: GROUP ({member_count} members)") # Проверяем нужно ли отвечать if is_dm: cleaned = text logger.info(f"💬 DM message, sending to n8n") else: # В группе только на обращение if not text.lower().startswith(f"{BOT_NICKNAME.lower()}:"): logger.info(f"⏭️ Ignoring group message without mention") return cleaned = text[len(f"{BOT_NICKNAME}:"):].strip() if not cleaned: return logger.info(f"💬 Group mention detected, sending to n8n") # Отправляем сообщение в n8n для обработки AI logger.info(f"🚀 Sending to n8n webhook...") # Получаем имя отправителя из комнаты sender_name = None if event.sender in room.users: user = room.users[event.sender] sender_name = user.name or event.sender result = await send_to_n8n(cleaned, room.room_id, event.sender, is_dm, sender_name, event.event_id) if not result: fallback_reply = "Извините, возникла ошибка при обработке вашего сообщения. Попробуйте позже." try: await client.room_send( room_id=room.room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": fallback_reply} ) logger.warning(f"📤 Отправлен fallback ответ") except Exception as e: logger.error(f"❌ Ошибка отправки fallback: {e}") async def invite_callback(room, event): """Обработка приглашений в комнаты""" try: await client.join(room.room_id) logger.info(f"✅ Присоединился к комнате: {room.room_id}") except Exception as e: logger.exception(f"❌ Ошибка join: {e}") async def member_callback(room, event): """Обработка событий присоединения/выхода пользователей""" try: if event.membership != "join": return if event.state_key == client.user_id: logger.debug(f"Ignoring own membership event") return room_name = room.display_name or room.name or "" logger.info(f"👥 Member event in room: {room_name} ({room.room_id})") if room_name.lower() != "общая": logger.debug(f"Skipping welcome message - not 'Общая' room") return new_user_id = event.state_key new_user_name = new_user_id if new_user_id in room.users: user = room.users[new_user_id] new_user_name = user.name or new_user_id logger.info(f"👋 Sending welcome message to {new_user_name} in {room_name}") plain_message = f"{new_user_name}: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя." html_message = f'<a href="https://matrix.to/#/{new_user_id}">{new_user_name}</a>: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя.' await client.room_send( room_id=room.room_id, message_type="m.room.message", content={ "msgtype": "m.text", "body": plain_message, "format": "org.matrix.custom.html", "formatted_body": html_message } ) logger.info(f"✅ Welcome message sent to {new_user_name}") except Exception as e: logger.error(f"❌ Ошибка в member_callback: {e}") logger.exception(e) async def to_device_callback(event): """Обработка to-device сообщений для обмена ключами""" logger.debug(f"To-device event: {type(event).__name__}") # ===== HTTP сервер ===== async def start_http_server(): """Запускает HTTP API сервер для получения ответов от n8n""" global app, http_runner app = web.Application() app.router.add_post('/send_message', send_message_handler) app.router.add_get('/health', health_handler) http_runner = web.AppRunner(app) await http_runner.setup() site = web.TCPSite(http_runner, '0.0.0.0', HTTP_API_PORT) await site.start() logger.info(f"🌐 HTTP API запущен на порту {HTTP_API_PORT}") logger.info(f"📡 Endpoints: /send_message (POST), /health (GET)") # ===== Основной цикл ===== async def main(): global client, sync_error_count logger.info(f"🚀 Запуск Matrix бота") logger.info(f"🏠 Homeserver: {HOMESERVER}") logger.info(f"👤 User: {USER}") logger.info(f"📡 n8n webhook: {N8N_WEBHOOK_URL}") # Загружаем сохранённые credentials device_id, access_token = load_credentials() # Создаём клиента с сохранёнными данными if device_id and access_token: client = AsyncClient( HOMESERVER, USER, config=config, store_path=STORE_PATH, device_id=device_id ) client.access_token = access_token client.user_id = USER logger.info(f"✅ Используем существующую сессию, device_id: {device_id}") else: client = AsyncClient(HOMESERVER, USER, config=config, store_path=STORE_PATH) client.user_id = USER logger.info("Выполняется первичный логин...") if not PASSWORD: logger.error("❌ MATRIX_PASSWORD не установлен в .env") return login_resp = await client.login(PASSWORD, device_name="matrix-bot") if not isinstance(login_resp, LoginResponse): logger.error(f"❌ Login failed: {login_resp}") return logger.info(f"✅ Залогинились как {USER}, device_id: {client.device_id}") save_credentials(client.device_id, client.access_token) # Загрузка crypto store try: client.load_store() logger.info("✅ Crypto store загружен") except Exception as e: logger.warning(f"Crypto store не загружен: {e}") # ===== НОВОЕ: Первичная синхронизация с обработкой next_batch ошибок ===== logger.info("⏳ Выполняется первичная синхронизация...") max_sync_retries = 3 for sync_attempt in range(max_sync_retries): try: sync_response = await client.sync(timeout=30000, full_state=True) logger.info(f"✅ Первичная синхронизация завершена") sync_error_count = 0 break except Exception as e: error_str = str(e) sync_attempt_num = sync_attempt + 1 # Проверяем на ошибку next_batch if "next_batch" in error_str or "'next_batch' is a required property" in error_str: logger.error(f"❌ Sync error (попытка {sync_attempt_num}/{max_sync_retries}): next_batch missing") logger.warning(f"🔄 Это происходит после перезагрузки - Matrix сервер вернул некорректный ответ") if sync_attempt_num < max_sync_retries: logger.info(f"🗑️ Очищаю sync токены и пробую ещё раз...") clear_sync_tokens() await asyncio.sleep(5) # Дождёмся перед следующей попыткой else: logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток") logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...") break else: logger.error(f"❌ Ошибка синхронизации (попытка {sync_attempt_num}/{max_sync_retries}): {e}") if sync_attempt_num < max_sync_retries: await asyncio.sleep(5) else: logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток") logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...") break # Загрузка ключей шифрования при необходимости if client.should_upload_keys: logger.info("📤 Загрузка ключей шифрования...") try: await client.keys_upload() logger.info("✅ Ключи шифрования загружены") except Exception as e: logger.warning(f"⚠️ Ошибка загрузки ключей: {e}") # Запрос ключей для шифрованных комнат logger.info("🔑 Запрос ключей для комнат...") try: await client.keys_query() shared_count = 0 for room_id, room in client.rooms.items(): if room.encrypted: try: await client.share_group_session(room_id, ignore_unverified_devices=True) shared_count += 1 except Exception as e: logger.warning(f"⚠️ Share session error for {room_id}: {e}") if shared_count > 0: logger.info(f"🔐 Ключи общей сессии переданы для {shared_count} шифрованных комнат") except Exception as e: if "No key query required" not in str(e): logger.warning(f"⚠️ Keys query error: {e}") # Регистрируем callbacks для новых событий client.add_event_callback(message_callback, RoomMessageText) client.add_event_callback(message_callback, MegolmEvent) client.add_event_callback(invite_callback, InviteMemberEvent) client.add_event_callback(member_callback, RoomMemberEvent) client.add_to_device_callback(to_device_callback, (KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac)) # Запускаем HTTP сервер await start_http_server() logger.info(f"🤖 Бот полностью готов к работе!") logger.info(f"✅ Listening for messages...") try: # ===== НОВОЕ: Основной цикл с обработкой ошибок next_batch ===== consecutive_errors = 0 while True: try: await client.sync(timeout=30000) consecutive_errors = 0 # Reset при успехе except Exception as e: error_str = str(e) consecutive_errors += 1 # Проверяем на ошибку next_batch if "next_batch" in error_str or "'next_batch' is a required property" in error_str: logger.warning(f"⚠️ Sync error #{consecutive_errors}: next_batch missing (обычно после перезагрузки)") if consecutive_errors == 1: logger.info("🗑️ Первая ошибка - очищаю sync токены...") clear_sync_tokens() # Экспоненциальная задержка: 5s, 10s, 20s, 40s... delay = min(5 * (2 ** (consecutive_errors - 1)), 120) logger.info(f"⏳ Жду {delay}s перед повторной попыткой...") await asyncio.sleep(delay) if consecutive_errors > 5: logger.error("❌ Слишком много consecutive ошибок (>5), контейнер перезагружается...") break else: # Другие ошибки logger.error(f"❌ Ошибка sync #{consecutive_errors}: {e}") if consecutive_errors > 3: logger.warning(f"⚠️ {consecutive_errors} ошибок подряд, даю больше времени...") await asyncio.sleep(30) else: await asyncio.sleep(10) except KeyboardInterrupt: logger.info("⏹️ Получен сигнал завершения") finally: if http_runner: await http_runner.cleanup() if client: await client.close() logger.info("👋 Бот остановлен") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Bot остановлен пользователем") except Exception as e: logger.exception(f"❌ Fatal error: {e}") </code> ==== Запуск бота ==== <code bash> # Создаём .env файл с конфигурацией cat > .env << EOF MATRIX_HOMESERVER=https://syna.digitizepro.tech MATRIX_USER=@bot:syna.digitizepro.tech MATRIX_PASSWORD=your_password PICKLE_KEY=your_encryption_key N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai N8N_TIMEOUT=30 HTTP_API_PORT=5000 EOF # Запускаем бот python matrix_bot.py </code> ==== Использование с Docker ==== **requirements.txt:** <code> python-nio>=0.24.0 aiohttp>=3.8.0 python-dotenv>=0.21.0 </code> **Dockerfile:** <code dockerfile> FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY matrix_bot.py . CMD ["python", "matrix_bot.py"] </code> ==== Основные функции ==== ==== Message Callback ==== Обрабатывает текстовые сообщения: * Расшифровывает Megolm-сообщения (из шифрованных комнат) * Определяет тип комнаты (DM или GROUP) * В DM отвечает на все сообщения * В GROUP отвечает только на обращение (''ChatBot: вопрос'') * Отправляет в n8n webhook ==== Send to n8n ==== Отправляет сообщение в n8n: * text - очищенный текст * room_id - ID комнаты * sender - Matrix ID отправителя * sender_name - Имя отправителя * is_dm - флаг типа комнаты * event_id - ID события ==== Send Message Handler ==== HTTP endpoint для ответов от n8n: * Получает сообщение и параметры * Добавляет HTML mention если есть sender_id * Отправляет в Matrix комнату ===== Технические детали ====== ==== Порядок сообщений в контексте ==== Query Memory возвращает сообщения в порядке DESC (новые сверху), но Build Context Code переворачивает их так чтобы **новые сообщения были в конце контекста**. Это важно чтобы AI правильно понял что является "последним" сообщением. ==== Экранирование SQL ==== В Save and Send используется функция esc() которая экранирует одинарные кавычки ('' → '''') чтобы избежать SQL инъекций. ==== Fallback значения ==== Если sender_name не пришел, используется sender ID Если сообщение не пришло, используется пустая строка Все это обрабатывается в Build Context Code и Save and Send ===== Дополнительная информация ====== **Версия Workflow:** 236 **Последнее обновление:** 2025-11-29 **Статус:** Production Ready ✅ **Contacts:** Если возникли вопросы по workflow, смотрите логи в Executions vm/matrix-bot/01-install.txt Последнее изменение: 2025/12/01 12:34 — admin