Matrix Bot AI - Полная документация

Matrix Bot AI - это интеллектуальный чат-бот для платформы Matrix, работающий на n8n. Бот запоминает историю разговоров в PostgreSQL и использует OpenAI GPT-4o-mini для генерации ответов с учетом контекста предыдущих сообщений.

Основные возможности:

  • Запоминание истории разговоров в разных комнатах
  • Использование контекста при ответах
  • Сохранение ID и имен собеседников
  • Автоматическая сортировка истории от новых к старым

1. Matrix сервер - отправляет вебхук при новом сообщении 2. n8n Workflow - обрабатывает сообщение и генерирует ответ 3. PostgreSQL - хранит историю разговоров 4. OpenAI API - генерирует ответы через GPT-4o-mini 5. Matrix Bot сервис - отправляет сообщения обратно в чат

1. Webhook приходит → Формирует данные сообщения (текст, room_id, sender, sender_name) 2. Query Memory → Получает последние 10 сообщений из истории этой комнаты 3. Build Context → Собирает контекст разговора для AI с сохранением порядка (новые сообщения в конце) 4. AI Agent → OpenAI генерирует ответ на основе контекста 5. Save and Send → Сохраняет результат в БД и отправляет сообщение в чат 6. Response → Возвращает успешный ответ на вебхук

Таблица conversation_history:

  • room_id - VARCHAR - ID комнаты Matrix
  • sender - VARCHAR - Matrix ID отправителя (@admin:domain)
  • sender_name - VARCHAR - Отображаемое имя в чате (Николай)
  • message - TEXT - Текст сообщения
  • bot_response - TEXT - Ответ бота
  • created_at - TIMESTAMP - Время создания записи

Выполните эти SQL команды в PgAdmin:

-- Создание таблицы conversation_history
CREATE TABLE conversation_history (
  id SERIAL PRIMARY KEY,
  room_id VARCHAR(255) NOT NULL,
  sender VARCHAR(255) NOT NULL,
  sender_name VARCHAR(255),
  message TEXT,
  bot_response TEXT,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
-- Создание индекса для быстрого поиска по room_id
CREATE INDEX idx_room_id ON conversation_history(room_id);
CREATE INDEX idx_created_at ON conversation_history(created_at DESC);
 
-- Добавление колонки sender_name если таблица уже существует
ALTER TABLE conversation_history 
ADD COLUMN sender_name VARCHAR(255);

Тип: n8n-nodes-base.webhook Назначение: Получает HTTP POST запрос от Matrix сервера Параметры:

  • Path: matrix-bot-ai
  • HTTP Method: POST
  • Response Mode: responseNode

Входящие данные:

{
  "body": {
    "text": "сообщение",
    "room_id": "!abc123:domain",
    "sender": "@user:domain",
    "sender_name": "User Name",
    "is_dm": false
  }
}

Тип: n8n-nodes-base.code (JavaScript) Назначение: Форматирует входящие данные в стандартный вид

Код:

const body = $json.body;
return {
  text: body.message || body.text,
  room_id: body.room_id,
  sender: body.sender || '',
  sender_name: body.sender_name || 'User',
  originalMessage: body.message || body.text,
  is_group: !body.is_dm
};

Выходные данные:

{
  "text": "сообщение",
  "room_id": "!abc123:domain",
  "sender": "@user:domain",
  "sender_name": "User Name",
  "originalMessage": "сообщение",
  "is_group": false
}

Тип: n8n-nodes-base.postgres Назначение: Получает историю из PostgreSQL Операция: executeQuery

SQL Query:

SELECT sender, sender_name, message, bot_response, created_at 
FROM conversation_history 
WHERE room_id = '{{room_id}}' 
ORDER BY created_at DESC 
LIMIT 10

Параметр Query (в n8n):

SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '{{ "'" + $json.room_id.replace(/'/g, "''") + "'" }}' ORDER BY created_at DESC LIMIT 10

Выходные данные: Массив последних 10 сообщений (или пусто если их нет)

Тип: n8n-nodes-base.code (JavaScript) Назначение: Собирает контекст разговора для AI из памяти

Код:

// Get Format Input from $node
const formatInput = $node['Format Input'].json;
 
// Get all items from Query Memory
const allItems = $input.all();
const memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || [];
 
let contextText = '';
 
if (memoryItems.length > 0) {
  contextText = 'Recent conversation history:\n\n';
  // NEWEST FIRST - iterate from 0 to end (last item is most recent)
  for (let i = 0; i < memoryItems.length; i++) {
    const record = memoryItems[i].json || {};
    if (record.message && record.sender && record.bot_response && record.created_at) {
      const ts = new Date(record.created_at).toLocaleString('ru-RU');
      const senderName = record.sender_name || record.sender;
      contextText += `[${ts}] ${senderName}: ${record.message}\n`;
      contextText += `  → Jarvice: ${record.bot_response}\n\n`;
    }
  }
}
 
// IMPORTANT: Return ALL fields from Format Input
return {
  context: contextText,
  text: formatInput.text,
  room_id: formatInput.room_id,
  sender: formatInput.sender,
  sender_name: formatInput.sender_name,
  originalMessage: formatInput.originalMessage,
  is_group: formatInput.is_group
};

Выходные данные:

{
  "context": "Recent conversation history:\n\n[время] Николай: 3+2\n  → Jarvice: 5\n\n",
  "text": "сообщение",
  "room_id": "!abc123:domain",
  "sender": "@user:domain",
  "sender_name": "User Name",
  "originalMessage": "сообщение",
  "is_group": false
}

Тип: @n8n/n8n-nodes-langchain.agent Назначение: LangChain Agent использующий OpenAI модель

Параметры:

  • Prompt Type: define
  • Text: json.text
  • System Message: (см. ниже - НЕ МЕНЯТЬ!)

System Message:

Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖.
Ты помощник Администратора (Николай).
Отвечай кратко, используй emoji.
Обязательно используй контекст предыдущих сообщений для лучшего ответа.

ПРАВИЛА ЧАТА (сообщай их ТОЛЬКО если тебя о них спросили есть они или нет или просят сказать какие правила в этом чате, перечисляй только эти правила и не добавляй другие от себя).
1. Сообщения в чате сохраняются одну неделю.
2. Все сообщения старше недели удаляются.
3. В чате есть AI ассистент - может помочь с вопросами.
4. Соблюдай законодательство РФ.

Тип: @n8n/n8n-nodes-langchain.lmChatOpenAi Назначение: Подключение к OpenAI API

Параметры:

  • Model: gpt-4o-mini
  • Credentials: aitunnel (ваш OpenAI API key)

Тип: n8n-nodes-base.code (JavaScript) Назначение: Подготавливает данные для сохранения и отправки

Код:

const ai = $json.output || $node['AI Agent'].json.output || '';
const buildContext = $node['Build Context Code'].json;
 
const roomId = buildContext.room_id || '';
const isGroup = buildContext.is_group || false;
const sender = buildContext.sender || '';
const senderName = buildContext.sender_name || '';
const originalMessage = buildContext.originalMessage || '';
 
const esc = (s) => {
  if (!s) return 'NULL';
  return "'" + String(s).replace(/'/g, "''") + "'";
};
 
const query = "INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (" +
  esc(roomId) + ", " +
  esc(sender) + ", " +
  esc(senderName) + ", " +
  esc(originalMessage) + ", " +
  esc(ai) + ")";
 
return {
  message: ai,
  room_id: roomId,
  sender_id: isGroup ? sender : null,
  sender_name: isGroup ? senderName : null,
  query: query
};

Выходные данные:

{
  "message": "Ответ бота",
  "room_id": "!abc123:domain",
  "sender_id": "@user:domain",
  "sender_name": "User Name",
  "query": "INSERT INTO conversation_history..."
}

Тип: n8n-nodes-base.httpRequest Назначение: Отправляет ответ в Matrix через Matrix Bot сервис

Параметры:

JSON Body:

{{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }}

Тип: n8n-nodes-base.postgres Назначение: Сохраняет разговор в PostgreSQL

Параметры:

Тип: n8n-nodes-base.respondToWebhook Назначение: Отправляет успешный ответ на вебхук

Параметры:

  • Respond With: json
  • Response Body: true

Webhook → Format Input Format Input → Query Memory (index 0) Format Input → Build Context Code (index 1) Query Memory → Build Context Code (index 0) Build Context Code → AI Agent OpenAI Model → AI Agent (ai_languageModel) AI Agent → Save and Send Save and Send → Send to Matrix Save and Send → Save to Database Send to Matrix → Respond to Webhook

Используйте этот JSON для создания workflow в n8n:

{
  "name": "Matrix Bot AI",
  "nodes": [
    {
      "parameters": {
        "httpMethod": "POST",
        "path": "matrix-bot-ai",
        "responseMode": "responseNode",
        "options": {}
      },
      "id": "webhook-1",
      "name": "Webhook",
      "type": "n8n-nodes-base.webhook",
      "position": [-400, 304],
      "webhookId": "2dad965f-08e2-412f-960a-b16c91dea686",
      "typeVersion": 2
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const body = $json.body;\nreturn {\n  text: body.message || body.text,\n  room_id: body.room_id,\n  sender: body.sender || '',\n  sender_name: body.sender_name || 'User',\n  originalMessage: body.message || body.text,\n  is_group: !body.is_dm\n};"
      },
      "id": "format-input",
      "name": "Format Input",
      "type": "n8n-nodes-base.code",
      "position": [-144, 304],
      "typeVersion": 1
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "={{ \"SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '\" + $json.room_id.replace(/'/g, \"''\") + \"' ORDER BY created_at DESC LIMIT 10\" }}",
        "additionalFields": {}
      },
      "id": "query-memory",
      "name": "Query Memory",
      "type": "n8n-nodes-base.postgres",
      "position": [0, 464],
      "typeVersion": 1,
      "credentials": {
        "postgres": {
          "id": "Sy23a0Zvp6PlFt6V",
          "name": "Postgres matrix_bot_memory"
        }
      }
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "// Get Format Input from $node\nconst formatInput = $node['Format Input'].json;\n\n// Get all items from Query Memory\nconst allItems = $input.all();\nconst memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || [];\n\nlet contextText = '';\n\nif (memoryItems.length > 0) {\n  contextText = 'Recent conversation history:\\n\\n';\n  // NEWEST FIRST - iterate from 0 to end (last item is most recent)\n  for (let i = 0; i < memoryItems.length; i++) {\n    const record = memoryItems[i].json || {};\n    if (record.message && record.sender && record.bot_response && record.created_at) {\n      const ts = new Date(record.created_at).toLocaleString('ru-RU');\n      const senderName = record.sender_name || record.sender;\n      contextText += `[${ts}] ${senderName}: ${record.message}\\n`;\n      contextText += `  → Jarvice: ${record.bot_response}\\n\\n`;\n    }\n  }\n}\n\n// IMPORTANT: Return ALL fields from Format Input\nreturn {\n  context: contextText,\n  text: formatInput.text,\n  room_id: formatInput.room_id,\n  sender: formatInput.sender,\n  sender_name: formatInput.sender_name,\n  originalMessage: formatInput.originalMessage,\n  is_group: formatInput.is_group\n};"
      },
      "id": "build-context-code",
      "name": "Build Context Code",
      "type": "n8n-nodes-base.code",
      "position": [160, 304],
      "typeVersion": 1
    },
    {
      "parameters": {
        "promptType": "define",
        "text": "={{ $json.context + 'User (' + $json.sender_name + '): ' + $json.text }}",
        "options": {
          "systemMessage": "Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖.\nТы помощник Администратора (Николай).\nОтвечай кратко, используй emoji.\nОбязательно используй контекст предыдущих сообщений для лучшего ответа. Отвечай с числовыми результатами и референциями на предыдущие вычисления."
        }
      },
      "id": "agent-1",
      "name": "AI Agent",
      "type": "@n8n/n8n-nodes-langchain.agent",
      "position": [304, 304],
      "typeVersion": 1.9
    },
    {
      "parameters": {
        "model": {
          "mode": "list",
          "value": "gpt-4o-mini"
        },
        "options": {},
        "builtInTools": {}
      },
      "id": "openai-1",
      "name": "OpenAI Model",
      "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
      "position": [352, 512],
      "typeVersion": 1.3,
      "credentials": {
        "openAiApi": {
          "id": "P1rSZeqDWn05zkK1",
          "name": "aitunnel"
        }
      }
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const ai = $json.output || $node['AI Agent'].json.output || '';\nconst buildContext = $node['Build Context Code'].json;\n\nconst roomId = buildContext.room_id || '';\nconst isGroup = buildContext.is_group || false;\nconst sender = buildContext.sender || '';\nconst senderName = buildContext.sender_name || '';\nconst originalMessage = buildContext.originalMessage || '';\n\nconst esc = (s) => {\n  if (!s) return 'NULL';\n  return \"'\" + String(s).replace(/'/g, \"''\") + \"'\";\n};\n\nconst query = \"INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (\" +\n  esc(roomId) + \", \" +\n  esc(sender) + \", \" +\n  esc(senderName) + \", \" +\n  esc(originalMessage) + \", \" +\n  esc(ai) + \")\";\n\nreturn {\n  message: ai,\n  room_id: roomId,\n  sender_id: isGroup ? sender : null,\n  sender_name: isGroup ? senderName : null,\n  query: query\n};"
      },
      "id": "save-and-send",
      "name": "Save and Send",
      "type": "n8n-nodes-base.code",
      "position": [560, 304],
      "typeVersion": 1
    },
    {
      "parameters": {
        "method": "POST",
        "url": "http://matrix_bot:5000/send_message",
        "sendBody": true,
        "specifyBody": "json",
        "jsonBody": "={{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }}",
        "options": {}
      },
      "id": "http-1",
      "name": "Send to Matrix",
      "type": "n8n-nodes-base.httpRequest",
      "position": [752, 208],
      "typeVersion": 4.2
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "={{ $json.query }}",
        "additionalFields": {}
      },
      "id": "save-to-db",
      "name": "Save to Database",
      "type": "n8n-nodes-base.postgres",
      "position": [752, 400],
      "typeVersion": 1,
      "credentials": {
        "postgres": {
          "id": "Sy23a0Zvp6PlFt6V",
          "name": "Postgres matrix_bot_memory"
        }
      }
    },
    {
      "parameters": {
        "respondWith": "json",
        "responseBody": "={{ JSON.stringify({ success: true }) }}",
        "options": {}
      },
      "id": "respond-1",
      "name": "Respond to Webhook",
      "type": "n8n-nodes-base.respondToWebhook",
      "position": [960, 208],
      "typeVersion": 1.1
    }
  ],
  "connections": {
    "Webhook": {
      "main": [
        [
          {
            "node": "Format Input",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "AI Agent": {
      "main": [
        [
          {
            "node": "Save and Send",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Format Input": {
      "main": [
        [
          {
            "node": "Query Memory",
            "type": "main",
            "index": 0
          },
          {
            "node": "Build Context Code",
            "type": "main",
            "index": 1
          }
        ]
      ]
    },
    "OpenAI Model": {
      "ai_languageModel": [
        [
          {
            "node": "AI Agent",
            "type": "ai_languageModel",
            "index": 0
          }
        ]
      ]
    },
    "Query Memory": {
      "main": [
        [
          {
            "node": "Build Context Code",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Save and Send": {
      "main": [
        [
          {
            "node": "Send to Matrix",
            "type": "main",
            "index": 0
          },
          {
            "node": "Save to Database",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Send to Matrix": {
      "main": [
        [
          {
            "node": "Respond to Webhook",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Build Context Code": {
      "main": [
        [
          {
            "node": "AI Agent",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  },
  "settings": {
    "executionOrder": "v1",
    "saveDataErrorExecution": "all",
    "saveDataSuccessExecution": "all",
    "saveManualExecutions": true,
    "saveExecutionProgress": true
  }
}

1. Откройте PgAdmin 2. Выполните SQL команды из раздела «Подготовка базы данных» 3. Проверьте что таблица создана

1. Скопируйте JSON из раздела «JSON для импорта в n8n» 2. В n8n нажмите «+» > «Import from URL or paste JSON» 3. Вставьте JSON и нажмите «Import» 4. Откройте импортированный workflow

1. Откройте workflow 2. Найдите node «Query Memory» и установите credentials PostgreSQL 3. Найдите node «OpenAI Model» и установите credentials OpenAI API 4. Найдите node «Save to Database» и установите credentials PostgreSQL

1. Нажмите «Test» чтобы проверить webhook URL 2. Скопируйте webhook URL 3. Настройте Matrix сервер на отправку вебхуков на этот URL 4. Активируйте workflow (кнопка «Activate»)

Пользователь: Сколько будет 7 + 3? Бот: 7 + 3 = 10 ✨

Пользователь: А сколько было в предыдущем вычислении? Бот: В предыдущем вычислении было: 7 + 3 = 10 📝

Пользователь: Какой сегодня день? Бот: Сегодня День недели, число месяца 📅

Пользователь: А вчера какой был? Бот: Вчера был [предыдущий день] 📆

1. Проверьте что workflow активен 2. Проверьте credentials PostgreSQL и OpenAI 3. Проверьте что Matrix сервер отправляет вебхуки на правильный URL 4. Посмотрите Executions в n8n - там будут ошибки

Выполните SQL команду:

ALTER TABLE conversation_history 
ADD COLUMN sender_name VARCHAR(255);

1. Проверьте что SQL query в «Query Memory» выглядит правильно 2. Проверьте что сообщения сохраняются в БД (Look in Save to Database node output) 3. Проверьте что room_id совпадает в разных сообщениях

1. Проверьте что API key валидный 2. Проверьте что у вас есть баланс в OpenAI 3. Проверьте что используется правильная модель (gpt-4o-mini)

Matrix Bot - это Python сервис, который подключается к Matrix серверу и:

  • Получает новые сообщения из чатов
  • Отправляет их в n8n webhook для обработки AI
  • Получает ответы через HTTP API и отправляет в Matrix
  • Поддерживает шифрованные комнаты (Megolm)
  • Приветствует новых пользователей в комнате «Общая»
  • Python 3.9+
  • python-nio (Matrix client library)
  • aiohttp (async HTTP client)
  • python-dotenv (для конфигурации)
pip install python-nio aiohttp python-dotenv
MATRIX_HOMESERVER=https://syna.digitizepro.tech
MATRIX_USER=@bot:syna.digitizepro.tech
MATRIX_PASSWORD=your_password_here
PICKLE_KEY=your_encryption_key_here
N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai
N8N_TIMEOUT=30
HTTP_API_PORT=5000

Файл: matrix_bot.py

import asyncio
import os
import logging
from logging.handlers import RotatingFileHandler
import json
import aiohttp
from pathlib import Path
from aiohttp import web
from nio import (
    AsyncClient,
    AsyncClientConfig,
    RoomMessageText,
    MegolmEvent,
    InviteMemberEvent,
    RoomMemberEvent,
    LoginResponse,
    KeyVerificationStart,
    KeyVerificationCancel,
    KeyVerificationKey,
    KeyVerificationMac,
    LocalProtocolError,
)
 
# ===== Настройка логирования =====
os.makedirs('/app/logs', exist_ok=True)
 
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
 
file_handler = RotatingFileHandler(
    '/app/logs/matrix_bot.log',
    maxBytes=50*1024*1024,
    backupCount=10,
    encoding='utf-8'
)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
 
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO)
 
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
 
logger = logging.getLogger("matrix-bot")
 
# ===== Конфигурация =====
HOMESERVER = os.getenv("MATRIX_HOMESERVER", "https://syna.digitizepro.tech")
USER = os.getenv("MATRIX_USER", "@bot:syna.digitizepro.tech")
PASSWORD = os.getenv("MATRIX_PASSWORD")
STORE_PATH = "/app/store"
PICKLE_KEY = os.getenv("PICKLE_KEY", "default_insecure_key_change_me")
BOT_NICKNAME = "ChatBot"
CREDENTIALS_FILE = os.path.join(STORE_PATH, "credentials.json")
 
# n8n интеграция
N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL", "http://n8n:5678/webhook/matrix-bot-ai")
N8N_TIMEOUT = int(os.getenv("N8N_TIMEOUT", "30"))
 
# HTTP API для ответов от n8n
HTTP_API_PORT = int(os.getenv("HTTP_API_PORT", "5000"))
 
# Настройка клиента с шифрованием
config = AsyncClientConfig(
    encryption_enabled=True,
    pickle_key=PICKLE_KEY,
    store_sync_tokens=True
)
 
# Глобальные переменные
client = None
app = None
http_runner = None
sync_error_count = 0
 
# ===== НОВОЕ: Функция очистки sync токенов =====
def clear_sync_tokens():
    """Очищает sync токены для полной пересинхронизации после перезагрузки"""
    try:
        store_path = Path(STORE_PATH)
 
        # Список файлов токенов которые нужно удалить
        token_files = [
            store_path / "sync_token.json",
            store_path / ".matrix-nio-sync-token"
        ]
 
        for token_file in token_files:
            if token_file.exists():
                token_file.unlink()
                logger.info(f"🗑️ Удалён sync token файл: {token_file}")
 
        # Удаляем базу комнат для полной пересинхронизации
        db_file = store_path / "rooms.json"
        if db_file.exists():
            db_file.unlink()
            logger.info(f"🗑️ Удалена база комнат для пересинхронизации")
 
        logger.info("✅ Sync токены очищены, будет выполнена полная пересинхронизация")
 
    except Exception as e:
        logger.warning(f"⚠️ Ошибка очистки sync tokens: {e}")
 
# ===== HTTP API обработчики =====
async def send_message_handler(request):
    """HTTP endpoint для отправки сообщения в Matrix от n8n"""
    try:
        data = await request.json()
        room_id = data.get("room_id")
        message = data.get("message")
        reply_to_event_id = data.get("reply_to_event_id")
        sender_id = data.get("sender_id")
        sender_name = data.get("sender_name")
 
        if not room_id or not message:
            return web.json_response(
                {"error": "room_id and message required"},
                status=400
            )
 
        logger.info(f"📤 Получен запрос отправить сообщение в комнату {room_id}")
        logger.info(f"💬 Текст: {message[:100]}")
        if reply_to_event_id:
            logger.info(f"↩️ Reply to event: {reply_to_event_id}")
        if sender_id:
            logger.info(f"👤 Mention user: {sender_name} ({sender_id})")
 
        # Формируем content сообщения
        content = {
            "msgtype": "m.text",
            "body": message
        }
 
        # Если есть sender_id, добавляем HTML mention
        if sender_id and sender_name:
            content["format"] = "org.matrix.custom.html"
            content["formatted_body"] = f'<a href="https://matrix.to/#/{sender_id}">{sender_name}</a>: {message}'
 
        # Добавляем reply-to если указан event_id
        if reply_to_event_id:
            content["m.relates_to"] = {
                "m.in_reply_to": {
                    "event_id": reply_to_event_id
                }
            }
            if "formatted_body" not in content:
                content["format"] = "org.matrix.custom.html"
                content["formatted_body"] = message
            content["formatted_body"] = f'<mx-reply><blockquote><a href="https://matrix.to/#/{room_id}/{reply_to_event_id}">In reply to</a></blockquote></mx-reply>{content["formatted_body"]}'
 
        # Отправляем сообщение в Matrix
        try:
            await client.room_send(
                room_id=room_id,
                message_type="m.room.message",
                content=content
            )
            logger.info(f"✅ Сообщение отправлено в {room_id}")
            return web.json_response({"status": "ok", "room_id": room_id})
        except LocalProtocolError as e:
            logger.error(f"❌ Ошибка отправки (protocol): {e}")
            return web.json_response(
                {"error": f"Protocol error: {str(e)}"},
                status=500
            )
        except Exception as e:
            logger.error(f"❌ Ошибка отправки: {e}")
            return web.json_response(
                {"error": f"Send error: {str(e)}"},
                status=500
            )
 
    except json.JSONDecodeError:
        return web.json_response(
            {"error": "Invalid JSON"},
            status=400
        )
    except Exception as e:
        logger.error(f"❌ Ошибка в handler: {e}")
        return web.json_response(
            {"error": str(e)},
            status=500
        )
 
async def health_handler(request):
    """Health check endpoint"""
    is_connected = client is not None and client.logged_in if client else False
    return web.json_response({
        "status": "ok",
        "connected": is_connected,
        "timestamp": asyncio.get_event_loop().time()
    })
 
# ===== Функции =====
def save_credentials(device_id, access_token):
    """Сохраняет учётные данные в файл"""
    try:
        Path(STORE_PATH).mkdir(parents=True, exist_ok=True)
        with open(CREDENTIALS_FILE, "w") as f:
            json.dump({
                "device_id": device_id,
                "access_token": access_token
            }, f)
        logger.info(f"✅ Credentials сохранены в {CREDENTIALS_FILE}")
    except Exception as e:
        logger.error(f"❌ Ошибка сохранения credentials: {e}")
 
def load_credentials():
    """Загружает учётные данные из файла"""
    try:
        if os.path.exists(CREDENTIALS_FILE):
            with open(CREDENTIALS_FILE, "r") as f:
                creds = json.load(f)
            logger.info(f"✅ Credentials загружены: device_id={creds.get('device_id')}")
            return creds.get("device_id"), creds.get("access_token")
    except Exception as e:
        logger.warning(f"Не удалось загрузить credentials: {e}")
    return None, None
 
async def send_to_n8n(text, room_id, sender, is_dm, sender_name=None, event_id=None):
    """Отправляет сообщение в n8n webhook для обработки AI"""
    payload = {
        "text": text,
        "room_id": room_id,
        "sender": sender,
        "sender_name": sender_name or sender,
        "is_dm": is_dm,
        "event_id": event_id
    }
 
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                N8N_WEBHOOK_URL,
                json=payload,
                timeout=aiohttp.ClientTimeout(total=N8N_TIMEOUT)
            ) as resp:
                if resp.status == 200:
                    result = await resp.json()
                    logger.info(f"✅ n8n обработал сообщение")
                    return result
                else:
                    text_resp = await resp.text()
                    logger.error(f"❌ n8n вернул ошибку {resp.status}: {text_resp[:200]}")
                    return None
    except asyncio.TimeoutError:
        logger.error(f"⏱️ Timeout при обращении к n8n (>{N8N_TIMEOUT}s)")
        return None
    except Exception as e:
        logger.error(f"❌ Ошибка при отправке в n8n: {e}")
        return None
 
async def message_callback(room, event):
    """Обработка текстовых сообщений"""
    logger.info(f"📨 Event type: {type(event).__name__}, sender: {event.sender}, room: {room.room_id}")
 
    # Расшифровка Megolm-сообщений
    if isinstance(event, MegolmEvent):
        logger.info(f"🔐 Decrypting megolm event...")
        try:
            event = await client.decrypt_event(event)
            if not event:
                logger.warning(f"Failed to decrypt event in {room.room_id}")
                return
            logger.info(f"✅ Decrypted successfully")
        except Exception as e:
            logger.error(f"Decrypt error: {e}")
            return
 
    if not isinstance(event, RoomMessageText):
        logger.debug(f"Ignoring non-text event: {type(event).__name__}")
        return
 
    if event.sender == client.user_id:
        logger.debug(f"Ignoring own message")
        return
 
    text = (event.body or "").strip()
    if not text:
        return
 
    logger.info(f"💬 Message text: {text[:100]}")
    logger.info(f"🆔 Event ID: {event.event_id}")
 
    # ===== ИСПРАВЛЕНО: Правильная обработка None в member_count =====
    member_count = getattr(room, 'member_count', None)
 
    # Если member_count None, пробуем другие способы
    if member_count is None or member_count == 0:
        member_count = len(room.users) if hasattr(room, 'users') and room.users else 0
 
    if member_count == 0 and hasattr(room, 'joined_count'):
        member_count = getattr(room, 'joined_count', 0)
 
    if member_count == 0 and hasattr(room, 'summary'):
        member_count = getattr(room.summary, 'joined_member_count', 0)
 
    # ===== КРИТИЧНО: Проверка что member_count не None перед сравнением =====
    # Если всё ещё None (после перезагрузки/бекапа), используем default значение
    if member_count is None:
        logger.warning(f"⚠️ member_count is None (после перезагрузки?), используем default=2 (DM)")
        member_count = 2
 
    logger.info(f"🔍 Room info: members={member_count}, name={room.display_name or room.name or 'None'}")
 
    # Определяем тип комнаты
    if member_count == 2:
        is_dm = True
        logger.info(f"🔍 Room type: DM (2 members)")
    elif member_count <= 1:  # ← Теперь member_count никогда не будет None!
        is_dm = True
        logger.info(f"🔍 Room type: DM (fallback, members={member_count})")
    else:
        is_dm = False
        logger.info(f"🔍 Room type: GROUP ({member_count} members)")
 
    # Проверяем нужно ли отвечать
    if is_dm:
        cleaned = text
        logger.info(f"💬 DM message, sending to n8n")
    else:
        # В группе только на обращение
        if not text.lower().startswith(f"{BOT_NICKNAME.lower()}:"):
            logger.info(f"⏭️ Ignoring group message without mention")
            return
        cleaned = text[len(f"{BOT_NICKNAME}:"):].strip()
        if not cleaned:
            return
        logger.info(f"💬 Group mention detected, sending to n8n")
 
    # Отправляем сообщение в n8n для обработки AI
    logger.info(f"🚀 Sending to n8n webhook...")
 
    # Получаем имя отправителя из комнаты
    sender_name = None
    if event.sender in room.users:
        user = room.users[event.sender]
        sender_name = user.name or event.sender
 
    result = await send_to_n8n(cleaned, room.room_id, event.sender, is_dm, sender_name, event.event_id)
 
    if not result:
        fallback_reply = "Извините, возникла ошибка при обработке вашего сообщения. Попробуйте позже."
        try:
            await client.room_send(
                room_id=room.room_id,
                message_type="m.room.message",
                content={"msgtype": "m.text", "body": fallback_reply}
            )
            logger.warning(f"📤 Отправлен fallback ответ")
        except Exception as e:
            logger.error(f"❌ Ошибка отправки fallback: {e}")
 
async def invite_callback(room, event):
    """Обработка приглашений в комнаты"""
    try:
        await client.join(room.room_id)
        logger.info(f"✅ Присоединился к комнате: {room.room_id}")
    except Exception as e:
        logger.exception(f"❌ Ошибка join: {e}")
 
async def member_callback(room, event):
    """Обработка событий присоединения/выхода пользователей"""
    try:
        if event.membership != "join":
            return
 
        if event.state_key == client.user_id:
            logger.debug(f"Ignoring own membership event")
            return
 
        room_name = room.display_name or room.name or ""
        logger.info(f"👥 Member event in room: {room_name} ({room.room_id})")
 
        if room_name.lower() != "общая":
            logger.debug(f"Skipping welcome message - not 'Общая' room")
            return
 
        new_user_id = event.state_key
        new_user_name = new_user_id
 
        if new_user_id in room.users:
            user = room.users[new_user_id]
            new_user_name = user.name or new_user_id
 
        logger.info(f"👋 Sending welcome message to {new_user_name} in {room_name}")
 
        plain_message = f"{new_user_name}: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя."
 
        html_message = f'<a href="https://matrix.to/#/{new_user_id}">{new_user_name}</a>: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя.'
 
        await client.room_send(
            room_id=room.room_id,
            message_type="m.room.message",
            content={
                "msgtype": "m.text",
                "body": plain_message,
                "format": "org.matrix.custom.html",
                "formatted_body": html_message
            }
        )
 
        logger.info(f"✅ Welcome message sent to {new_user_name}")
 
    except Exception as e:
        logger.error(f"❌ Ошибка в member_callback: {e}")
        logger.exception(e)
 
async def to_device_callback(event):
    """Обработка to-device сообщений для обмена ключами"""
    logger.debug(f"To-device event: {type(event).__name__}")
 
# ===== HTTP сервер =====
async def start_http_server():
    """Запускает HTTP API сервер для получения ответов от n8n"""
    global app, http_runner
 
    app = web.Application()
    app.router.add_post('/send_message', send_message_handler)
    app.router.add_get('/health', health_handler)
 
    http_runner = web.AppRunner(app)
    await http_runner.setup()
    site = web.TCPSite(http_runner, '0.0.0.0', HTTP_API_PORT)
    await site.start()
    logger.info(f"🌐 HTTP API запущен на порту {HTTP_API_PORT}")
    logger.info(f"📡 Endpoints: /send_message (POST), /health (GET)")
 
# ===== Основной цикл =====
async def main():
    global client, sync_error_count
 
    logger.info(f"🚀 Запуск Matrix бота")
    logger.info(f"🏠 Homeserver: {HOMESERVER}")
    logger.info(f"👤 User: {USER}")
    logger.info(f"📡 n8n webhook: {N8N_WEBHOOK_URL}")
 
    # Загружаем сохранённые credentials
    device_id, access_token = load_credentials()
 
    # Создаём клиента с сохранёнными данными
    if device_id and access_token:
        client = AsyncClient(
            HOMESERVER, 
            USER, 
            config=config, 
            store_path=STORE_PATH,
            device_id=device_id
        )
        client.access_token = access_token
        client.user_id = USER
        logger.info(f"✅ Используем существующую сессию, device_id: {device_id}")
    else:
        client = AsyncClient(HOMESERVER, USER, config=config, store_path=STORE_PATH)
        client.user_id = USER
        logger.info("Выполняется первичный логин...")
 
        if not PASSWORD:
            logger.error("❌ MATRIX_PASSWORD не установлен в .env")
            return
 
        login_resp = await client.login(PASSWORD, device_name="matrix-bot")
        if not isinstance(login_resp, LoginResponse):
            logger.error(f"❌ Login failed: {login_resp}")
            return
        logger.info(f"✅ Залогинились как {USER}, device_id: {client.device_id}")
 
        save_credentials(client.device_id, client.access_token)
 
    # Загрузка crypto store
    try:
        client.load_store()
        logger.info("✅ Crypto store загружен")
    except Exception as e:
        logger.warning(f"Crypto store не загружен: {e}")
 
    # ===== НОВОЕ: Первичная синхронизация с обработкой next_batch ошибок =====
    logger.info("⏳ Выполняется первичная синхронизация...")
    max_sync_retries = 3
    for sync_attempt in range(max_sync_retries):
        try:
            sync_response = await client.sync(timeout=30000, full_state=True)
            logger.info(f"✅ Первичная синхронизация завершена")
            sync_error_count = 0
            break
 
        except Exception as e:
            error_str = str(e)
            sync_attempt_num = sync_attempt + 1
 
            # Проверяем на ошибку next_batch
            if "next_batch" in error_str or "'next_batch' is a required property" in error_str:
                logger.error(f"❌ Sync error (попытка {sync_attempt_num}/{max_sync_retries}): next_batch missing")
                logger.warning(f"🔄 Это происходит после перезагрузки - Matrix сервер вернул некорректный ответ")
 
                if sync_attempt_num < max_sync_retries:
                    logger.info(f"🗑️ Очищаю sync токены и пробую ещё раз...")
                    clear_sync_tokens()
                    await asyncio.sleep(5)  # Дождёмся перед следующей попыткой
                else:
                    logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток")
                    logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...")
                    break
            else:
                logger.error(f"❌ Ошибка синхронизации (попытка {sync_attempt_num}/{max_sync_retries}): {e}")
                if sync_attempt_num < max_sync_retries:
                    await asyncio.sleep(5)
                else:
                    logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток")
                    logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...")
                    break
 
    # Загрузка ключей шифрования при необходимости
    if client.should_upload_keys:
        logger.info("📤 Загрузка ключей шифрования...")
        try:
            await client.keys_upload()
            logger.info("✅ Ключи шифрования загружены")
        except Exception as e:
            logger.warning(f"⚠️ Ошибка загрузки ключей: {e}")
 
    # Запрос ключей для шифрованных комнат
    logger.info("🔑 Запрос ключей для комнат...")
    try:
        await client.keys_query()
        shared_count = 0
        for room_id, room in client.rooms.items():
            if room.encrypted:
                try:
                    await client.share_group_session(room_id, ignore_unverified_devices=True)
                    shared_count += 1
                except Exception as e:
                    logger.warning(f"⚠️ Share session error for {room_id}: {e}")
        if shared_count > 0:
            logger.info(f"🔐 Ключи общей сессии переданы для {shared_count} шифрованных комнат")
    except Exception as e:
        if "No key query required" not in str(e):
            logger.warning(f"⚠️ Keys query error: {e}")
 
    # Регистрируем callbacks для новых событий
    client.add_event_callback(message_callback, RoomMessageText)
    client.add_event_callback(message_callback, MegolmEvent)
    client.add_event_callback(invite_callback, InviteMemberEvent)
    client.add_event_callback(member_callback, RoomMemberEvent)
    client.add_to_device_callback(to_device_callback,
                                  (KeyVerificationStart,
                                   KeyVerificationCancel,
                                   KeyVerificationKey,
                                   KeyVerificationMac))
 
    # Запускаем HTTP сервер
    await start_http_server()
 
    logger.info(f"🤖 Бот полностью готов к работе!")
    logger.info(f"✅ Listening for messages...")
 
    try:
        # ===== НОВОЕ: Основной цикл с обработкой ошибок next_batch =====
        consecutive_errors = 0
        while True:
            try:
                await client.sync(timeout=30000)
                consecutive_errors = 0  # Reset при успехе
 
            except Exception as e:
                error_str = str(e)
                consecutive_errors += 1
 
                # Проверяем на ошибку next_batch
                if "next_batch" in error_str or "'next_batch' is a required property" in error_str:
                    logger.warning(f"⚠️ Sync error #{consecutive_errors}: next_batch missing (обычно после перезагрузки)")
 
                    if consecutive_errors == 1:
                        logger.info("🗑️ Первая ошибка - очищаю sync токены...")
                        clear_sync_tokens()
 
                    # Экспоненциальная задержка: 5s, 10s, 20s, 40s...
                    delay = min(5 * (2 ** (consecutive_errors - 1)), 120)
                    logger.info(f"⏳ Жду {delay}s перед повторной попыткой...")
                    await asyncio.sleep(delay)
 
                    if consecutive_errors > 5:
                        logger.error("❌ Слишком много consecutive ошибок (>5), контейнер перезагружается...")
                        break
                else:
                    # Другие ошибки
                    logger.error(f"❌ Ошибка sync #{consecutive_errors}: {e}")
 
                    if consecutive_errors > 3:
                        logger.warning(f"⚠️ {consecutive_errors} ошибок подряд, даю больше времени...")
                        await asyncio.sleep(30)
                    else:
                        await asyncio.sleep(10)
 
    except KeyboardInterrupt:
        logger.info("⏹️ Получен сигнал завершения")
    finally:
        if http_runner:
            await http_runner.cleanup()
        if client:
            await client.close()
        logger.info("👋 Бот остановлен")
 
if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Bot остановлен пользователем")
    except Exception as e:
        logger.exception(f"❌ Fatal error: {e}")
# Создаём .env файл с конфигурацией
cat > .env << EOF
MATRIX_HOMESERVER=https://syna.digitizepro.tech
MATRIX_USER=@bot:syna.digitizepro.tech
MATRIX_PASSWORD=your_password
PICKLE_KEY=your_encryption_key
N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai
N8N_TIMEOUT=30
HTTP_API_PORT=5000
EOF
 
# Запускаем бот
python matrix_bot.py

requirements.txt:

python-nio>=0.24.0
aiohttp>=3.8.0
python-dotenv>=0.21.0

Dockerfile:

FROM python:3.11-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY matrix_bot.py .
 
CMD ["python", "matrix_bot.py"]

Обрабатывает текстовые сообщения:

  • Расшифровывает Megolm-сообщения (из шифрованных комнат)
  • Определяет тип комнаты (DM или GROUP)
  • В DM отвечает на все сообщения
  • В GROUP отвечает только на обращение (ChatBot: вопрос)
  • Отправляет в n8n webhook

Отправляет сообщение в n8n:

  • text - очищенный текст
  • room_id - ID комнаты
  • sender - Matrix ID отправителя
  • sender_name - Имя отправителя
  • is_dm - флаг типа комнаты
  • event_id - ID события

HTTP endpoint для ответов от n8n:

  • Получает сообщение и параметры
  • Добавляет HTML mention если есть sender_id
  • Отправляет в Matrix комнату

Query Memory возвращает сообщения в порядке DESC (новые сверху), но Build Context Code переворачивает их так чтобы новые сообщения были в конце контекста. Это важно чтобы AI правильно понял что является «последним» сообщением.

В Save and Send используется функция esc() которая экранирует одинарные кавычки ('') чтобы избежать SQL инъекций.

Если sender_name не пришел, используется sender ID Если сообщение не пришло, используется пустая строка Все это обрабатывается в Build Context Code и Save and Send

Версия Workflow: 236 Последнее обновление: 2025-11-29 Статус: Production Ready ✅

Contacts: Если возникли вопросы по workflow, смотрите логи в Executions

  • vm/matrix-bot/01-install.txt
  • Последнее изменение: 2025/12/01 12:34
  • admin