Перейти к содержимому

9. Синхронизация Офис — Касса

Общая схема

Офис — источник истины для всех справочных данных. POS-терминалы не обращаются к офису напрямую — между ними стоит Касса (облачный микросервис).

┌──────────┐ ┌──────────┐ ┌──────────────┐
│ │ Kafka │ │ REST / │ │
│ ОФИС │────────►│ КАССА │ WebSocket│ POS- │
│ (backend)│ │ (облако) │◄────────►│ ТЕРМИНАЛ │
│ │ │ │ │ │
└──────────┘ └──────────┘ └──────────────┘
Создаёт и Хранит Показывает
изменяет копию данных меню, пробивает
данные для POS чеки

Что синхронизируется

СущностьНаправлениеЧто именно
НоменклатураОфис → КассаБлюда, ингредиенты, заготовки
Группы номенклатурыОфис → КассаДерево категорий меню
ЦеныОфис → КассаЦены по подразделениям
ТехкартыОфис → КассаРецептуры для списания
Ингредиенты техкартОфис → КассаСостав рецептур
МодификаторыОфис → КассаДобавки к блюдам
СотрудникиОфис → КассаДля авторизации на POS
ПодразделенияОфис → КассаОрганизационная структура
POS-терминалыОфис → КассаНастройки терминалов
Методы оплатыОфис → КассаНаличные, карта, безнал
Складские остаткиОфис → КассаКоличество и себестоимость по складам
Чеки, сменыКасса → ОфисПродажи с терминалов

Как это работает

Шаг за шагом

┌─────────────────────────────────────────────────────────────────────┐
│ │
│ 1. ОФИС: менеджер изменил цену Латте │
│ │
│ 2. В ОДНОЙ ТРАНЗАКЦИИ: │
│ ├── Обновлена NomenclaturePrice (новая цена) │
│ ├── Инкремент version: 4 → 5 │
│ └── Вставлена запись в outbox_events: │
│ entity: "nomenclature_price" │
│ action: "UPDATED" │
│ version: 5 │
│ payload: { цена, подразделение, ... } │
│ │
│ 3. OUTBOX SCHEDULER (планировщик, каждые N секунд): │
│ ├── Забрал PENDING записи из outbox_events │
│ ├── Сформировал CloudEvent-сообщение │
│ ├── Отправил в Kafka │
│ └── Пометил запись как SENT │
│ │
│ 4. КАССА (Kafka-consumer): │
│ ├── Получила событие │
│ ├── Проверила version: входящий 5 > текущий 4 → применяем │
│ ├── Обновила свою БД │
│ └── Записала в inbound_events (revision = автоинкремент) │
│ │
│ 5. POS-ТЕРМИНАЛ (два канала получения): │
│ │
│ Канал A: WebSocket (реальное время) │
│ ├── Касса пушит событие через WebSocket │
│ └── POS мгновенно обновляет локальные данные │
│ │
│ Канал B: REST /api/sync (восстановление после офлайна) │
│ ├── POS делает GET /api/sync?last_synced_revision=142 │
│ └── Касса отдаёт все события с revision > 142 │
│ │
└─────────────────────────────────────────────────────────────────────┘

Идемпотентность

Если одно и то же событие придёт дважды — касса проверит version. Если входящий version ≤ текущего — событие пропускается (SKIPPED). Это гарантирует, что данные не «откатятся» назад.

Текущая version номенклатуры на кассе: 5
Пришло событие с version 3 → SKIPPED (старое, игнорируем)
Пришло событие с version 5 → SKIPPED (такое же, игнорируем)
Пришло событие с version 6 → APPLIED (новее, применяем)

Формат событий (CloudEvents)

Все события передаются в стандартном формате CloudEvents:

Тип события: core.company.office.<сущность>.<действие>
Примеры:
core.company.office.nomenclature.updated
core.company.office.tech_card.created
core.company.office.nomenclature_price.updated
core.company.office.employee.deleted

Роутинг по подразделениям

Не все данные нужны всем терминалам. Каждое событие содержит список подразделений, для которых оно актуально.

Событие: цена Латте изменена для «Центр»
Терминал «Центр-1»: подразделения = [Центр] → ✅ получит
Терминал «Центр-2»: подразделения = [Центр] → ✅ получит
Терминал «Парк-1»: подразделения = [Парк] → ❌ не получит

Терминал при авторизации получает JWT-токен, в котором указаны его подразделения. Касса фильтрует события по этому списку.


Два канала доставки

WebSocket — реальное время

Касса ──── WebSocket push ────► POS-терминал
• Мгновенная доставка (миллисекунды)
• Работает пока терминал онлайн
• Если терминал был офлайн — события пропущены

REST /api/sync — восстановление

POS-терминал ──── GET /api/sync?last_synced_revision=142 ────► Касса
Касса ──── все события с revision > 142 ────► POS-терминал
• Используется при включении терминала
• Догоняет пропущенные за время офлайна
• Каждое событие имеет автоинкрементный revision

Как работают вместе

Терминал включился
├── 1. Подключился к WebSocket (для новых событий)
└── 2. Запросил /api/sync?last_synced_revision=142
(догнать пропущенное за ночь)
Получил события 143, 144, 145...
Обновил last_synced_revision = 145
└── 3. Через WebSocket пришло событие 146
Применил, обновил revision = 146
└── Работает в штатном режиме

Что происходит при ошибках

СитуацияЧто делает система
Kafka недоступнаOutbox Scheduler повторит попытку позже. Данные не теряются — они в outbox_events
Касса недоступнаKafka хранит сообщения. Касса обработает их при восстановлении
POS офлайнПри включении POS догонит через /api/sync
Дубликат событияПроверка version — дубликат будет проигнорирован
Событие пришло не по порядкуversion гарантирует, что старое не перезапишет новое

Наглядная схема всех компонентов

┌──────────────────────────────────────────────────────────────────┐
│ ОФИС │
│ │
│ ┌────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Application│───►│ outbox_events│───►│ Outbox Scheduler │ │
│ │ Service │ │ (таблица БД) │ │ (забирает PENDING, │ │
│ │ │ │ │ │ шлёт в Kafka) │ │
│ └────────────┘ └──────────────┘ └──────────┬───────────┘ │
│ │ │
└───────────────────────────────────────────────────┼──────────────┘
┌─────▼─────┐
│ KAFKA │
└─────┬─────┘
┌───────────────────────────────────────────────────┼──────────────┐
│ КАССА │ │
│ │ │
│ ┌──────────────────┐ ┌───────────────┐ ┌────▼────────┐ │
│ │ REST /api/sync │◄───│inbound_events │◄──│ Kafka │ │
│ │ (для POS) │ │(с revision) │ │ Consumer │ │
│ └──────────────────┘ └───────────────┘ └─────────────┘ │
│ │
│ ┌──────────────────┐ │
│ │ WebSocket │───── push событий ────► POS-терминалы │
│ │ (реальное время)│ │
│ └──────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘

Обратная синхронизация: Касса → Офис

Данные идут не только из офиса на кассу, но и обратно — операционные данные с POS-терминалов возвращаются в офис.

POS-терминал ──► Касса ──► Kafka (топик: kassa.events) ──► Офис (kassa-sync модуль)

Что приходит обратно

ДанныеСобытиеЧто делает офис
Кассовая сменаcash_shift.opened / closedСохраняет в таблицу POS-смен
Чекиreceipt.created / updatedСохраняет позиции, суммы, скидки
Платежи по чекамpayment.createdСохраняет суммы по методам оплаты

Гарантии

  • Идемпотентность: все операции — UPSERT по ID сущности. Повторное событие просто перезапишет данные.
  • Транзакционность: каждое событие обрабатывается в одной транзакции.
  • Отказоустойчивость: битые сообщения логируются и пропускаются (не блокируют очередь).

Зачем офису данные с кассы

Чеки с POS ──► Отчёт по выручке (reporting модуль)
──► Акты реализации (автоматическое списание со склада)
──► Бухгалтерские проводки (выручка за день)
──► Приёмка смены управляющим