Архитектура распределённой системы синхронизации данных (Офис ↔ Касса ↔ POS)
Источник истины. Единый кросс-системный документ архитектуры Locali (свод office + kassa). Не копировать в репозитории сервисов — ссылаться отсюда. Ссылки на код даны относительно репозитория соответствующего сервиса.
Система состоит из трёх основных компонентов, взаимодействующих через Kafka и REST API, с использованием Outbox-паттерна, CloudEvents-протокола и revision/version-based синхронизации.
Общая схема взаимодействия
┌──────────┐ Kafka ┌──────────┐ REST API / WebSocket ┌──────────┐│ │ ────────────────────► │ │ ────────────────────────────►│ ││ ОФИС │ nomenclature, │ КАССА │ /api/inbox, │ POS ││ │ employee │ (Kassa) │ WebSocket push │(Терминал)││ │ ◄──────────────────── │ │ ◄────────────────────────────│ │└──────────┘ Kafka └──────────┘ POST /api/receipts и др. └──────────┘ │ │ Касса — единственная точка контакта для POS. │ /api/inbox отдаётся из БД кассы (не проксируется в офис). │ DDL локальной SQLite и маппинг event_type → SQL живут │ внутри бандла POS (registry.ts) — касса схему по сети не отдаёт.Авторизация и корпорации
corporationId не передаётся в эндпоинты — берётся из JWT текущего пользователя. У User есть флаг isAdmin: супер-админу доступен эндпоинт switch-corporation, который выдаёт новый JWT с нужным corporationId, после чего админ получает данные другой корпорации через те же эндпоинты.
Конвенция имён Kafka-топиков
Имена топиков — иерархические, разделитель — точка. Базовый шаблон: <сервис>.events, при необходимости — <сервис>.events.<подтип>. Имена топиков, на которые опирается приложение:
office.events— исходящие события офиса (константаOFFICE_OUTBOX_TOPICв src/common/ports/outbox.port.ts).kassa.events— входящие события кассы для офиса (envKAFKA_INBOUND_TOPIC, дефолт —kassa.events).
Запрещено:
- Использовать дефис (
-) как разделитель (office-events,kassa-eventsи т.п.). - Смешивать
.и_в одном имени.
Почему именно так. Kafka в JMX-метриках заменяет и ., и _ на _, поэтому смесь разделителей в разных топиках даёт коллизию имён метрик. Дефис как разделитель уровней допустим в Kafka, но конвенция в проекте — точка, чтобы имена единообразно «сворачивались» в иерархию (<сервис>.events.<подтип>) и не пересекались с дефисами внутри <сервис> или <подтип>.
При добавлении нового топика — следовать этой схеме. Расхождение имени топика на стороне продюсера и консьюмера (например, продюсер пишет в service-events, консьюмер слушает service.events) приводит к молчаливой потере событий — менять имя нужно скоординированно с продюсером.
Конвенция CloudEvents source
Поле source в CloudEvents-конверте идентифицирует сервис-продюсер. Шаблон — <сервис>-service (дефис как разделитель — это идентификатор сервиса, не имя топика):
office-service— события, опубликованные офисом (литерал зашит вCloudEventBuilder— src/common/integration/cloud-event.builder.ts).kassa-service— события, опубликованные кассой.
Значение source используется консьюмером для фильтрации/маршрутизации и должно меняться скоординированно между продюсером и консьюмером.
1. ОФИС (Office)
Офис — источник истины для справочных данных (номенклатура, сотрудники и т.д.). Он генерирует события при изменении данных и публикует их в Kafka через Outbox-паттерн.
1.1. Генерация событий
Application-сервис пишет в outbox_events напрямую в той же транзакции, что и бизнес-операцию. Не через EventListener — это гарантирует атомарность:
const entity = await this.transactionManager.run(async (tx) => { const updated = await this.repository.update(id, { ...data, version: current.version + 1 }, tx); await this.outboxRepository.create({ eventType: 'core.company.office.nomenclature.updated', entityType: 'nomenclature', entityId: id, action: 'updated', payload: OutboxMapper.toPayload(updated), status: OutboxStatus.PENDING, }, tx); return updated;});
// Доменное событие — ПОСЛЕ транзакции (для внутренних обработчиков, не для outbox)await this.eventEmitter.emitAsync('nomenclature.updated', ...);Важно: EventEmitter2.emitAsync() вызывается после transactionManager.run() — для внутренних подписчиков (инвалидация кэша, CQRS-саги и т.д.). Outbox-запись делается внутри транзакции сервисом, не слушателем.
1.2. Таблица outbox_events
┌─────────────────────────────────────────┐│ outbox_events │├─────────────┬───────────────────────────┤│ PK │ UniqueID │├─────────────┼───────────────────────────┤│ Event │ event_type ││ metadata │ entity_type ││ │ entity_id │├─────────────┼───────────────────────────┤│ Полный │ payload (JSON) ││ payload │ status ││ Статус │ last_error ││ публикации │ attempts ││ │ created_at ││ │ published_at │└─────────────┴───────────────────────────┘1.3. Конвенция именования event_type
Полный event_type формируется по конвенции: prefix + entity_type + . + action.
Каждый источник имеет свой префикс:
- Офис →
core.company.office. - Касса →
core.company.kassa.
Примеры:
entity_type: "nomenclature",action: "updated"→core.company.office.nomenclature.updatedentity_type: "employee",action: "created"→core.company.office.employee.createdentity_type: "receipt",action: "created"→core.company.kassa.receipt.createdentity_type: "cash_shift",action: "closed"→core.company.kassa.cash_shift.closedentity_type: "attendance",action: "opened"→core.company.kassa.attendance.openedentity_type: "attendance",action: "closed"→core.company.kassa.attendance.closed
Эта конвенция едина для:
- Kafka-сообщений (поле
typeв CloudEvents) - Ответа
/api/inbox(касса отдаётevent_typeготовым, POS использует его как ключ роутинга в локальный маппинг handler-ов)
Префиксы (OFFICE_EVENT_TYPE_PREFIX, KASSA_EVENT_TYPE_PREFIX) и каталог известных event_type (EVENT_TYPES.*) — в пакете @locali/office-contracts. Касса импортирует константы из пакета, у себя локально префиксные строки не объявляет. Офис фиксирует канонические значения в src/common/ports/outbox.port.ts (OFFICE_EVENT_TYPE_PREFIX, OFFICE_OUTBOX_TOPIC).
Миграция: текущий код использует устаревший формат event names (organization.subdivision.changed, accounting.deposit-withdrawal-type.changed). При переходе на Outbox-паттерн все event_type должны быть приведены к конвенции выше. entity_type — snake_case, action — конкретное действие (created/updated/deleted), не changed.
1.4. Версионирование сущностей (version)
Каждая синхронизируемая сущность в офисе хранит поле version — версию сущности, которая увеличивается при каждом изменении:
┌────────────────────┐│ subdivisions │ (пример любой синхронизируемой сущности)├──────┬─────────────┤│ PK │ UniqueID │├──────┼─────────────┤│ │ ... │ (бизнес-поля)│ │ version │ ◄── версия сущности (инкрементируется при каждом изменении)└──────┴─────────────┘Это поле:
- Хранится внутри
payloadсущности. В CloudEvents-сообщениях извлекается из payload на уровеньdata.version(не дублируется — вdata.payloadполеversionне повторяется) - Используется для идемпотентной обработки на стороне получателя (касса): если входящий
version≤ текущего в БД кассы — событие пропускается (SKIPPED)
Помимо version, офис передаёт в CloudEvent data.subdivision_ids — массив ID подразделений, которым принадлежит сущность (одна сущность может относиться к нескольким подразделениям). Касса сохраняет subdivisionIds: String[] в inbound_events и использует массив для роутинга события терминалам (см. разделы 2.2, 2.5, 2.8). Поле передаётся на верхнем уровне data (рядом с version и payload), а не внутри payload.
1.5. Outbox-паттерн (публикация в Kafka)
outbox_events ──► Outbox Scheduler ──► Producer ──► Kafka broker (Topic: office.events)
Жизненный цикл статуса:
PENDING ──► PUBLISHING ──► PUBLISHED ├──► PENDING (retry с exponential backoff, attempts++) └──► FAILED (терминал: attempts >= MAX_ATTEMPTS)Outbox Scheduler периодически опрашивает outbox_events, атомарно claim-ит PENDING-записи (UPDATE ... WHERE status='PENDING' RETURNING attempts — при нескольких инстансах publisher-а событие возьмёт ровно один), отправляет через publishOrThrow в Kafka. При успехе — PUBLISHED; при ошибке конкретного события — возврат в PENDING с exponential backoff, после MAX_ATTEMPTS — терминальный FAILED. Ошибки уровня соединения с брокером попытку не сжигают: событие возвращается в PENDING без учёта attempt, тик прерывается. При KAFKA_ENABLED != true публикация не запускается — события остаются в PENDING. Зависшие PUBLISHING-записи раз в минуту возвращает в PENDING cron resetStuckPublishing (исчерпавшие лимит — сразу в FAILED).
FIFO-гейт по агрегату (LOCALIOFFICE-1109, К2). Выборка PENDING-батча исключает событие, если у того же partition_key существует более раннее (по created_at, tie-break id) НЕопубликованное событие (NOT EXISTS в findPendingBatch, partial-индекс outbox_events_unpublished_partition_key_idx). Агрегат публикуется строго в порядке создания или стоит целиком: backoff/FAILED «головы» блокирует «хвост», ретраи не переупорядочивают события внутри Kafka-партиции. Тик публикатора дополнительно сериализован между репликами через Postgres advisory lock (pg_try_advisory_xact_lock, фиксированный ключ; занятый лок — тик пропускается).
Admin replay (LOCALIOFFICE-1109, К8). FAILED outbox-события и FAILED/DEAD inbound-записи возвращаются в работу без ручного SQL: GET /api/admin/sync/{outbox|inbound}/failed, POST .../failed/:id/requeue, POST .../failed/requeue-by-entity-type (только для администраторов). Метрики синхронизации: гейджи office_outbox_events / office_inbound_events (бэклог по статусам), office_outbox_stalled_aggregates (агрегаты, чьё старейшее неопубликованное событие старше 60 с), office_outbox_oldest_pending_age_seconds / office_inbox_oldest_pending_age_seconds (возраст старейшей необработанной записи), counter office_contract_schema_drift_total (входящий CloudEvent объявил dataschema с версией @locali/office-contracts новее локальной; сам исходящий dataschema пишет publisher).
1.6. Приём событий (Consumer)
Офис имеет Consumer (src/kassa-sync/), получающий события из Kafka-топика kassa.events и применяющий их к офисным сущностям через таблицу inbound_events (идемпотентность по (entityType, entityId, kafkaEventId)). «Битые» сообщения (не-JSON, невалидный envelope: не-UUID id/subject, отсутствие data, переполнение полей) consumer пропускает с error-логом и коммитом offset-а — они не должны блокировать партицию; инфраструктурные ошибки (БД недоступна при INSERT) пробрасываются — offset не коммитится, Kafka повторяет доставку, событие не теряется. BullMQ-очередь kassa-inbound обрабатывает PENDING-записи: KassaInboundProcessor в одной транзакции вызывает KassaEventDispatcher.apply(record, tx) и inboundRepo.markFinalized(...). На исключение — FAILED + nextRetryAt, забор fallback-cron’ом. Событие неизвестного типа (рассинхрон версий касса/офис при деплое) не финализируется как SKIPPED, а откладывается (deferRetry: возврат в PENDING, nextRetryAt +1 час, attempts сбрасываются) — применится после деплоя обработчика. Исключение — типы, которые офис осознанно не потребляет (IGNORED_KASSA_EVENT_TYPES в KassaEventDispatcher: легаси receipt.created/receipt.updated вне контракта): они финализируются терминальным SKIPPED сразу, иначе вечно держали бы алерт возраста inbound-бэклога (LOCALIOFFICE-1111); CI-проверка контрактов требует, чтобы каждый kassa-тип пакета был либо обработан, либо явно перечислен в этом списке. Ожидание «родителя» при out-of-order доставке (handler бросил DependencyMissingError: например, receipt.fiscalized раньше order.*, receipt.returned раньше оригинала) attempts не сжигает — запись откладывается с растущей задержкой и уходит в терминальный DEAD только когда её возраст превысил 24 часа (LOCALIOFFICE-1109, К3); обычные ошибки — по-прежнему 5 попыток → DEAD.
Реестр обработчиков (src/kassa-sync/application/handlers/kassa-event.dispatcher.ts):
| Event type | Назначение | Реализация |
|---|---|---|
core.company.kassa.cash_shift.opened/closed | UPSERT кассовой смены (владелец смены в офисе — Subdivision, см. LOCALIOFFICE-982; posTerminalId payload-а сохраняется как audit-snapshot, subdivisionId резолвится из БД-снэпшота терминала после cross-tenant guard. Partial unique cash_shifts_one_open_per_subdivision гарантирует не более одной OPEN-смены на подразделение.) | ACL (pos/application/event-handlers/cash-shift-sync.handler.ts, интерфейс kassa-sync/domain/acl/cash-shift-inbound.acl.ts) |
core.company.kassa.order.created/updated/cancelled/closed | Snapshot-upsert операционного заказа и его позиций (LOCALIOFFICE-1114): на каждый значимый переход касса публикует полный снимок заказа, все четыре типа применяются одинаково. Out-of-order-защита — по монотонно растущему version заказа кассы (PosOrder.kassaVersion); subdivisionId резолвится из БД-снэпшота терминала после cross-tenant guard; смена ещё не материализована — DependencyMissingError, inbound-механика ждёт. Статусы кассы шире офисных enum-ов — явный маппинг в handler-е (OPEN→DRAFT, SENT_TO_KITCHEN→COOKING, CLOSED→FISCALIZED; позиции: ORDERED→PENDING, SENT/IN_PROGRESS→COOKING, READY/SERVED→READY). Ранний локальный тип pos_order.snapshot (LOCALIOFFICE-1050) касса больше не публикует — обработчик удалён. | ACL (pos/application/event-handlers/pos-order-sync.handler.ts, интерфейс kassa-sync/domain/acl/pos-order-inbound.acl.ts) |
core.company.kassa.payment.created | UPSERT платежа | ACL (pos/application/event-handlers/pos-payment-sync.handler.ts, интерфейс kassa-sync/domain/acl/payment-inbound.acl.ts) |
core.company.kassa.attendance.opened/closed | Доменная логика staffing (audit, расчёт, outbox) | ACL (staffing/application/event-handlers/attendance-sync.handler.ts, интерфейс kassa-sync/domain/acl/attendance-inbound.acl.ts) |
core.company.kassa.receipt.fiscalized | Единственное событие чека: материализация PosReceipt с позициями + фискальные реквизиты (ФПД, № ФД, серийник ФН, рег. номер ККТ, snapshot реквизитов юрлица и точки продаж, опциональный контакт покупателя по 54-ФЗ, теги ФФД позиций) — LOCALIOFFICE-1052. Операционные поля (openedAt/waiterId/salesPointId) берутся из связанного PosOrder по orderId. Out-of-order (фискальное событие раньше cash_shift.opened или order.*) — DependencyMissingError, inbound-механика ждёт. | ACL (pos/application/event-handlers/pos-receipt-sync.handler.ts, интерфейс kassa-sync/domain/acl/receipt-inbound.acl.ts) |
core.company.kassa.cash_shift.fiscal_opened | Фискальное открытие смены: рег. номер ФН и № ФД отчёта об открытии — LOCALIOFFICE-1052. | ACL (pos/application/event-handlers/cash-shift-sync.handler.ts, интерфейс kassa-sync/domain/acl/cash-shift-inbound.acl.ts) |
core.company.kassa.cash_shift.fiscal_z_reported | Z-отчёт: № ФД отчёта о закрытии смены — LOCALIOFFICE-1052. Смена без zReportFdNumber при status = CLOSED считается «не закрытой фискально». | ACL (pos/application/event-handlers/cash-shift-sync.handler.ts, интерфейс kassa-sync/domain/acl/cash-shift-inbound.acl.ts) |
core.company.kassa.receipt.returned | UPSERT возвратного фискального документа в отдельную сущность PosReturnReceipt + PosReturnReceiptItem с обязательным per-item FK originalReceiptItemId — LOCALIOFFICE-1063 / LOCALIKASSA-312. Cross-tenant guard через originalReceipt.cashShift.corporationId; out-of-order (возврат раньше receipt.fiscalized оригинала) — handler выкидывает ошибку, inbound-механика ретраит. RevenueReportEntry со знаком минус строится on-demand через HTTP process-return-receipt/:id (паттерн идентичен продажам). Обратные проводки на закрытии смены подхватываются автоматически: SQL агрегата findCashShiftPaymentBreakdown UNION-ом объединяет sales (все pos_payments смены, после выпиливания discriminator-колонки operation_type) и returns из pos_return_receipts с пропорциональным распределением final_amount по методам оплаты оригинала. | ACL (pos/application/event-handlers/pos-return-receipt-sync.handler.ts, интерфейс kassa-sync/domain/acl/return-receipt-inbound.acl.ts) |
Все handler-ы живут у владельца агрегата в <module>/application/event-handlers/ и регистрируются через ACL-токен в KassaEventDispatcher (см. IMPLEMENT.md §9 «Inbound event-handlers»). На каждое успешно применённое inbound-событие handler возвращает DeferredDomainEvent[] — EventEmitter2-события публикуются процессором после коммита inbound-транзакции, что открывает точку подписки @OnEvent для других модулей (accounting, inventory, reporting).
1.7. Формат сообщения (CloudEvents protocol)
Envelope собирается единым CloudEventBuilder (src/common/integration/cloud-event.builder.ts) — литералы specversion: '1.0', source: 'office-service', datacontenttype: 'application/json' зашиты в нём.
Префикс type (core.company.office) и имя Kafka-топика — каноничные константы OFFICE_EVENT_TYPE_PREFIX и OFFICE_OUTBOX_TOPIC в src/common/ports/outbox.port.ts. При смене конвенции править там, не в публикаторе.
id события генерируется адаптером (randomUUID) и переиспользуется как CloudEvent.id — стабильный идемпотентный ключ для consumer’а между ретраями.
{ "specversion": "1.0", "type": "core.company.office.nomenclature.updated", "source": "office-service", "id": "uuid-event-id", "time": "2028-02-25T10:00:00Z", "subject": "uuid-nomenclature-id", "datacontenttype": "application/json", "data": { "version": 5, "subdivision_ids": ["uuid-subdivision-1", "uuid-subdivision-2"], "payload": { "id": "uuid-nomenclature-id", "name": "Новый Цезарь", "price": 550.00, "is_available": true // ... все поля сущности (version и subdivision_ids НЕ дублируются в payload) } }}Ключевые поля:
data.version— версия сущности. Используется для идемпотентной обработки: если входящийversion≤ текущего в БД — событие пропускается (SKIPPED)data.subdivision_ids— массив ID подразделений, к которым относится сущность. Касса использует массив для роутинга события нужным терминаламsubject— ID изменённой сущностиtype— полный тип события (домен + действие)
1.8. Стратегия Kafka partition-key (LOCALIOFFICE-1003)
Каждое outbox-событие публикуется в Kafka с явным partition-key. Сообщения с одинаковым ключом гарантированно попадают в одну партицию и читаются consumer-ом строго в порядке отправки. Без ключа KafkaJS распределяет сообщения round-robin — связанные события (например, attendance.opened → attendance.closed одного сотрудника) могут переставляться между партициями, и инбокс-консьюмер на стороне кассы вынужден чинить порядок ретраями.
Ключ резолвится по бизнес-агрегату — «наименьшему осмысленному ordering scope»:
entityType | partition-key | Источник | Что гарантирует |
|---|---|---|---|
corporation | entityId | id сущности | порядок настроек одной корпорации |
subdivision | entityId | id сущности | порядок изменений одного подразделения |
sales_point | entityId | id сущности | порядок изменений одной точки продаж |
pos_terminal | entityId | id сущности | порядок изменений одного терминала |
employee | entityId | id сущности | hire/update/dismiss одного сотрудника по порядку |
position | entityId | id сущности | порядок правок одной должности |
attendance_type | entityId | id сущности | порядок правок одного типа явки |
payment_method | entityId | id сущности | порядок правок одного метода оплаты (включая состав привязок к подразделениям — он едет в payload.subdivisionIds, LOCALIOFFICE-1042/LOCALIOFFICE-1062) |
payment_system | entityId | id сущности | порядок правок одной платёжной системы (глобальный справочник, fanout по корпорациям, LOCALIOFFICE-1042) |
nomenclature | entityId | id сущности | порядок правок одной позиции |
nomenclature_group | entityId | id сущности | порядок правок одной группы |
nomenclature_modifier | entityId | id сущности | порядок правок одного модификатора |
tech_card | entityId | id сущности | порядок правок одной техкарты |
shift_type | entityId | id сущности | порядок правок одного шаблона смены |
attendance | payload.employeeId | payload | все явки одного сотрудника по порядку (включая узкие cancelled / force_closed, LOCALIOFFICE-1014) |
schedule_entry | payload.employeeId | payload | весь график одного сотрудника по порядку |
payment_rate | payload.employeeId | payload | все ставки одного сотрудника по порядку |
warehouse_stock | payload.warehouseId | payload | все изменения остатков одного склада по порядку |
Резолвер — src/outbox/domain/services/outbox-partition-key.resolver.ts. Чисто синхронный, exhaustive по OutboxEntityType (новый тип без обновления резолвера ловится TypeScript-never-assert ещё на этапе компиляции). Резолвер вызывается в PrismaOutboxAdapter.record(...), результат персистится в столбце outbox_events.partition_key и затем передаётся в kafka.publishOrThrow(..., { key }) шедулером — это даёт стабильный ключ при ретраях публикации.
Что НЕ решает partition-key стратегия. Кросс-сущностный порядок (например, cash_shift.opened должен прийти до attendance.opened с тем же cashShiftId) не гарантируется: события разных типов идут с разными ключами и могут попасть в разные партиции. Это сознательный размен — за ordering между неоднородными агрегатами отвечает идемпотентность инбокса кассы + retry с экспоненциальным backoff’ом. Цель partition-key — устранить рассогласования внутри одного агрегата, где ретрай маскировал бы реальную деградацию.
Эксплуатационные ограничения. Стандартный Kafka-партиционер использует hash(key) mod partitions. Менять число партиций топика office.events без перераспределения существующих сообщений нельзя — после resize старые ключи попадут в другие партиции, и in-flight события того же агрегата окажутся раскинуты между двумя партициями, нарушая FIFO. Корректный сценарий resize: создать новый топик, переключить producer на него, дождаться дренажа старого, удалить старый. Число партиций фиксируется в конфигурации кластера и в @locali/office-contracts README на стороне команды кассы.
Регистрация новой OutboxEntityType. Расширяя OUTBOX_ENTITY (см. IMPLEMENT.md §8), нужно:
- Добавить новый case в
OutboxPartitionKeyResolver— выбрать наименьший осмысленный агрегат. - Обновить таблицу выше.
- Если новая сущность ссылается на существующий агрегат (как
attendance→ employee) — переиспользовать тот же ключ, чтобы оба домена ехали в одну партицию.
2. КАССА (Kassa)
Облачный микросервис между офисом и POS-терминалами. Принимает события из Kafka, обрабатывает их, предоставляет REST API и WebSocket для POS, генерирует свои события. Касса является единственной точкой контакта для POS и центром управления схемой и логикой для терминалов.
Почему касса — отдельный сервис:
- Разделение ответственности — офис не знает про POS, маппинги, схемы терминалов
- Независимость деплоя — касса и офис обновляются и масштабируются независимо
- Независимая точка отказа — офис может быть временно недоступен, касса продолжает отдавать POS данные из своей БД
- Собственная логика — уведомления, агрегация, обработка чеков/смен. Не нагружает офис ненужной ответственностью
- Подготовка к будущему — проще заложить сейчас, чем выносить из офиса позже
2.1. Приём событий из Kafka (Consumer)
Kafka ──► Consumer ──► Processing for main entities ──► Сохранение в БД кассыKafka-consumer слушает topic (по умолчанию office.events), парсит CloudEvent и передаёт в InboundEventService.process(). Проверка version выполняется не в общем сервисе, а в обработчике сущности (InboundEventHandler.handle()) — каждый обработчик сам сравнивает входящий version с текущей в БД и возвращает APPLIED или SKIPPED. Upsert сущности и обновление статуса записи в inbound_events происходят в одной транзакции.
Обработка входящих событий:
┌─────────────────────────────────────────────┐│ Processing for main entities ││ ││ 1. Save metadata and payload ││ in inbound_events ││ (revision назначается автоматически) ││ │ ││ ▼ ││ 2. event.data.version ≤ ││ current_db_version? ││ │YES │NO ││ ▼ ▼ ││ SKIPPED 3. upsert payload ││ │ ││ ▼ ││ 4. update status ││ (applied/failed) ││ for inbound_events │└─────────────────────────────────────────────┘Проверка version обеспечивает идемпотентность — если пришло событие со старой или равной версией, оно пропускается (SKIPPED).
2.2. Таблица inbound_events
Основа для /api/inbox. Каждое входящее событие получает автоинкрементный revision, который POS использует для догоняющей синхронизации.
┌─────────────────────────────────────────┐│ inbound_events │├─────────────┬───────────────────────────┤│ PK │ UniqueID │├─────────────┼───────────────────────────┤│ Идемпот. │ kafka_event_id (UNIQUE) │ ◄── CloudEvent.id; дубль доставки отсекается БД├─────────────┼───────────────────────────┤│ Event │ event_type │ ◄── полный тип: "core.company.office.nomenclature.updated"│ metadata │ entity_type │ ◄── "nomenclature"│ │ entity_id ││ │ version ││ │ subdivision_ids (String[])│ ◄── массив из data.subdivision_ids, для роутинга├─────────────┼───────────────────────────┤│ Полный │ payload ││ payload │ status ││ Retry │ attempts │ ◄── счётчик попыток (инкремент при claimForProcessing)│ │ last_error │ ◄── sanitized сообщение последней ошибки│ │ next_retry_at │ ◄── момент следующей попытки (exponential backoff)│ │ created_at ││ │ processed_at │├─────────────┼───────────────────────────┤│ Sync │ revision (autoincrement) │ ◄── глобальный автоинкремент для /api/inbox│ │ sync_to_pos │ ◄── boolean: нужно ли отдавать в /api/inbox└─────────────┴───────────────────────────┘
Жизненный цикл статуса:
RECEIVED ──► PROCESSING ──► APPLIED ─── терминал ├─► SKIPPED ─── терминал (старая версия / битый payload; │ «нет handler-а» — НЕ терминал: deferRetry возвращает │ в RECEIVED с nextRetryAt +1 час до деплоя обработчика; │ битый CloudEvent-envelope отсекается consumer-ом │ ещё до записи в inbound_events) └─► FAILED ──┐ │ scheduler retry (next_retry_at <= now AND attempts < 5) ▼ PROCESSING (новая попытка)kafka_event_id — CloudEvent.id входящего сообщения. UNIQUE-индекс гарантирует, что повторная доставка одного и того же CloudEvent не порождает вторую запись в inbound_events. Идемпотентность приёма обеспечивается БД-ограничением, а не сравнением полей в коде.
revision — глобальный монотонно возрастающий счётчик. Каждое входящее событие получает следующее значение. POS запрашивает изменения по revision > last_synced_revision.
sync_to_pos — флаг, определяющий нужно ли событие POS-терминалам. Устанавливается при записи на основе entity_type. Не все события из Kafka предназначены для POS (например, внутренние события бухгалтерии).
subdivision_ids — массив затронутых подразделений, извлекается из data.subdivision_ids CloudEvent (см. §1.4). Используется для фильтрации событий в /api/inbox и WebSocket push через Prisma-оператор hasSome: терминалу отдаются только события, массив subdivision_ids которых пересекается с подразделениями терминала. Корректность fan-out’а (включая попадание события в подразделения, которые потеряли доступ к сущности, и раскрытие «доступен везде») гарантируется источником на стороне офиса — касса доверяет envelope и не дополняет его на receiver-side.
attempts / last_error / next_retry_at — retry-семантика, симметричная outbox_events. На исключении handler-а событие переводится в FAILED с next_retry_at = now + backoff(attempts). Фоновый InboundRetryScheduler раз в 10 секунд подбирает RECEIVED/FAILED события, у которых attempts < 5 AND next_retry_at <= now(), и атомарно переводит их в PROCESSING (один UPDATE ... WHERE status IN (...) — claim защищён от гонки consumer↔scheduler). Раз в минуту тот же scheduler сбрасывает зависшие PROCESSING-записи (старше 15 минут) обратно в FAILED.
Отсутствие родительской реплики справочника (например, событие номенклатуры пришло раньше своей группы) с LOCALIKASSA-399 не уводит событие в FAILED: FK между репликами сняты, дочерняя запись применяется сразу (APPLIED), а недостающая ссылка отслеживается отдельно через kassa_dangling_references (см. § 1.8 и IMPLEMENT.md §7.8). FAILED-retry остаётся для настоящих ошибок обработки (битый payload, недоступность БД).
2.3. Генерация исходящих событий (Transaction)
Касса имеет собственные Prisma-модели (CashShift, Receipt, ReceiptItem, Payment, CashShiftDocument в prisma/schema/sales.prisma) и REST-эндпоинты для POS: POST /api/receipts, POST /api/cash-shifts (open/close), POST /api/cash-shift-documents/{cash-in,cash-out,cashier-change,x-report}. Каждая бизнес-операция кассы и запись outbox — в одной транзакции:
-- Всё атомарно:INSERT INTO receipts (...);UPDATE cash_shifts SET ...;INSERT INTO outbox_events (event_type, entity_type, entity_id, payload, status);2.4. Outbox на стороне кассы
Структура outbox_events аналогична офисной (topic, channel, eventType, entityType, entityId, terminalId, payload, status, attempts, lastError, nextRetryAt, publishedAt; отдельной колонки partition-key нет — ключ публикации берётся из entityId, см. §1.8), плюс nullable-колонки traceparent / tracestate (LOCALIKASSA-386) — снимок W3C trace-контекста HTTP-запроса терминала, в котором событие было записано. Outbox Scheduler восстанавливает его при публикации и кладёт в Kafka message headers, чтобы трейс оставался единым через асинхронную границу outbox (терминал → касса → офис); NULL — событие записано вне трейса. Свой Outbox Scheduler → Producer публикует события в Kafka: запускается каждые 5 секунд, забирает батч до 100 PENDING-записей на канал.
outbox_events ──► Outbox Scheduler ──► Producer ──► Kafka broker
Статусы: PENDING ──► PUBLISHING ──► PUBLISHED ├──► PENDING (retry с backoff, attempts++; │ ошибки соединения попытку не сжигают) └──► FAILED (терминал: attempts >= maxAttempts канала; реанимация — admin requeue, LOCALIKASSA-405 К8)FIFO-гейт публикации (LOCALIKASSA-405, К2). findPendingBatch не выбирает событие, пока у той же сущности (entityId в рамках того же topic) есть более раннее (createdAt, tie-break id) НЕопубликованное событие. Без гейта retry с backoff переупорядочивал бы события одной сущности: receipt.updated уезжал бы в офис раньше receipt.created, ушедшего в backoff. Терминальный FAILED блокирует всю сущность до ручного requeue через admin API (POST /api/admin/outbox/failed/:id/requeue, POST /api/admin/outbox/failed/requeue по entityType) — это сознательно: продолжать публикацию хвоста поверх потерянного события нельзя. Число сущностей, чьё старейшее неопубликованное событие старше 60 секунд, видно в gauge kassa_outbox_stalled_aggregates; возраст старейшего неопубликованного события — kassa_outbox_oldest_pending_age_seconds.
2.5. REST API /api/inbox — из БД кассы
Касса отдаёт /api/inbox из собственной БД (таблица inbound_events). Не проксирует в офис.
POS ──► Касса GET /api/inbox?last_synced_revision=N (Authorization: Bearer <JWT терминала>) │ ▼ SELECT FROM inbound_events WHERE revision > N AND sync_to_pos = true AND status = 'APPLIED' AND subdivision_ids && <подразделения терминала> -- hasSome ORDER BY revision ASC LIMIT 100Подразделения терминала касса берёт из JWT-payload POS (subdivisionIds, выдаётся при авторизации через /auth/terminal). Фильтр использует Prisma hasSome, то есть возвращаются события, у которых массив subdivisionIds пересекается с массивом терминала.
Важно: фильтр status = 'APPLIED' исключает SKIPPED-события (старые версии) из ответа. Без него POS мог бы получить устаревшие данные и перезаписать ими актуальные.
Формат ответа /api/inbox
{ "events": [ { "event_type": "core.company.office.nomenclature.updated", "entity_id": "uuid-nomenclature-id", "entity_type": "nomenclature", "action": "updated", "payload": { "id": "uuid-nomenclature-id", "name": "Новый Цезарь", "price": 550.00, "is_available": true, "version": 5 }, "revision": 124 }, { "event_type": "core.company.office.employee.created", "entity_id": "uuid-employee-id", "entity_type": "employee", "action": "created", "payload": { "id": "uuid-employee-id", "full_name": "Иванов Иван", "role": "cashier", "pin": "1234", "version": 1 }, "revision": 125 } ], "last_revision": 125, "has_more": false}event_type— полный тип события (prefix.entity_type.action). POS не собирает его из частей — использует as-isevents— массив изменений изinbound_eventsсrevision > last_synced_revision,sync_to_pos = true,status = 'APPLIED'иsubdivision_ids, пересекающимися с подразделениями терминала, отсортированных поrevisionASClast_revision— максимальная ревизия в текущем батче. POS сохраняет её какlast_synced_revisionhas_more— есть ли ещё данные за пределами батча. Еслиtrue— POS делает повторный запрос сlast_synced_revision=last_revisionpayloadсодержит все поля сущности включаяversion— POS использует поля как параметры для собственных UPSERT-ов в локальную SQLiteevent_typeприходит готовым из кассы — POS использует его напрямую как ключ роутинга в свой локальный маппинг handler-ов
Синхронизируемые сущности
Через inbound_events касса обрабатывает следующие сущности офиса: employee, employee_position, position, subdivision, sales_point, payment_method, deposit_withdrawal_type (LOCALIKASSA-297), nomenclature_group, pos_terminal, nomenclature, modifier, tech_card, tech_card_ingredient, warehouse_stock, corporation, preparation_place (LOCALIKASSA-311). Регистрация обработчиков — декларативно в InboxCoreModule (handler сам объявляет entityType, eventTypes, syncToPos).
Все эти сущности приходят через один Kafka-топик office.events (LOCALIKASSA-248). Отдельных inbound-топиков для каких-либо сущностей касса больше не слушает.
2.6. Синхронизируемые сущности офиса
Через inbound_events касса обрабатывает следующие сущности офиса: employee, employee_position, position, subdivision, payment_method, nomenclature_group, pos_terminal, nomenclature, modifier, tech_card, tech_card_ingredient, warehouse_stock, corporation. Регистрация handler-ов — декларативно в InboxCoreModule кассы (handler сам объявляет entityType, eventTypes, syncToPos). Все эти сущности приходят через один Kafka-топик office.events (LOCALIKASSA-248) — отдельных inbound-топиков для каких-либо сущностей касса больше не слушает.
DDL локальной SQLite POS и маппинг event_type → SQL касса по сети не отдаёт: и то, и другое живёт внутри бандла POS — реестр в electron/modules/sync/registry.ts, служебные таблицы и список миграций — в electron/shared/db/schema.ts (см. раздел 3.4).
Офис дополнительно публикует в outbox сущность sales_point (LOCALIOFFICE-983) — точка продаж внутри подразделения. Контракт уже идёт через OUTBOX_ENTITY.SALES_POINT с subdivisionIds=[entity.subdivisionId], локальный wire-payload — в src/common/ports/outbox-payloads.ts (SalesPointOutboxPayload). Инвариант, который держится в офисе: у каждого Subdivision ровно одна SalesPoint с isDefault=true — её создаёт SubdivisionService.create одной транзакцией с подразделением (LOCALIOFFICE-984), и она же служит fallback-ом при inbound-приёме чека без явного salesPointId. Регистрация DDL/обработчика на стороне кассы (приём sales_point-события и пометка чека выбранной точкой при логине сотрудника) делается отдельной задачей на стороне кассы; до её выкатки sync_to_pos для entity_type=sales_point не выставляется.
Офис также публикует в outbox сущность shift_type (LOCALIOFFICE-1002) — шаблон плановой смены корпорации. Контракт — OUTBOX_ENTITY.SHIFT_TYPE с subdivisionIds=[] (корпоративный справочник, как attendance_type/position), wire-payload — ShiftTypePayload из @locali/office-contracts@4.3.0, локально расширен в src/common/ports/outbox-payloads.ts как ShiftTypeOutboxPayload. Стиль публикации — снапшот: любое изменение (create / update / soft-delete / restore) уезжает одним OUTBOX_ACTION.UPDATED с актуальным payload.isDeleted и payload.version. Сидер шаблонов (002-staffing-defaults.seeder.ts) пишет в БД напрямую через prisma.shiftType.create, минуя ShiftTypeService, поэтому при пересеяле дев-стенда outbox не наполняется (это умышленно). Регистрация DDL/обработчика на стороне кассы — LOCALIKASSA-286; до её выкатки sync_to_pos для entity_type=shift_type не выставляется.
Для attendance офис, помимо обычного attendance.updated со snapshot-payload, публикует два узких события (LOCALIOFFICE-1014) — attendance.cancelled и attendance.force_closed. Они дополняют, а не заменяют UPDATED: на затронутых сценариях из офиса уходит пара событий (полный snapshot для всех подписчиков + узкое — для кассы). Касса подписывается только на узкие события, чтобы не фильтровать шумный поток правок и при этом синхронизировать локальную копию явки (снять partial unique-индекс «одна открытая явка на сотрудника» и/или перевести OPEN→OVERDUE/CLOSED).
attendance.cancelled(OUTBOX_ACTION.CANCELLED) — публикуется при soft-delete явки в офисе: отклонениеPENDING_PROXY_CARD_APPROVAL(AttendanceService.resolveProxyCardREJECT), отклонениеPENDING_OFF_SCHEDULE_APPROVAL(resolveOffScheduleREJECT), обычное удаление (delete), перекрытие реальной явкой авто-явки отсутствия (applyAbsenceOverride). Wire-payload —AttendanceCancelledOutboxPayloadв src/common/ports/outbox-payloads.ts (минимум: id, corp, subdivision, employee,reason, version). Причина (AttendanceCancellationReason) даёт кассе контекст:OFF_SCHEDULE_REJECTED/PROXY_CARD_REJECTED/OVERRIDDEN_BY_REAL_ARRIVAL/DELETED_BY_OFFICE.attendance.force_closed(OUTBOX_ACTION.FORCE_CLOSED) — публикуется, когда офис закрыл явку без участия кассы: schedulerautoCloseOverdue(только приsource === "OFFICE", причинаAUTO_CLOSED_ON_OVERDUE, статусOVERDUE) и увольнение сотрудника с открытой явкой (AttendanceService.autoCloseOnDismissal, вызывается изEmployeeService.dismiss; причинаDISMISSED, статусCLOSED). Wire-payload —AttendanceForceClosedOutboxPayload(id, corp, subdivision, employee, status: CLOSED|OVERDUE, reason, version). При inbound-сценариях от кассы (closeFromKassa,autoCloseStaleOpenAttendanceсsource: "KASSA") узкое событие не публикуется — касса не уведомляет сама себя.
Routing: subdivisionIds: [attendance.subdivisionId] — событие летит только в касса-копию подразделения, владеющего явкой. Partition-key — payload.employeeId (общий резолвер OutboxPartitionKeyResolver для всех вариантов payload). Wire-контракт живёт в @locali/office-contracts@5.1.0 (AttendanceCancelledPayload / AttendanceForceClosedPayload + EVENT_TYPES.attendanceCancelled / EVENT_TYPES.attendanceForceClosed); локально в src/common/ports/outbox-payloads.ts сделан только реэкспорт под историческими именами Attendance*OutboxPayload. DDL/обработчик на стороне кассы — LOCALIKASSA-293.
Механизм работы
- Бэкенд кассы регистрирует handler для
entity_typeвInboxCoreModule(handler сам объявляетeventTypes,syncToPos). Никакой агрегированной DDL-схемы по сети не отдаётся — DDL живёт у POS в electron/modules/sync/registry.ts. - POS получает события (через
/api/inboxили WebSocket) и применяет к локальной SQLiteEVENT_SQL[event.event_type]с биндомpayload. Маппинг и DDL обновляются вместе с бандлом POS (см. раздел 4 про auto-update). - Фронтенд маппит кнопки UI на вызов REST-эндпоинтов кассы (
POST /api/receiptsи т.п.) — бизнес-логика на стороне POS сводится к минимуму.
2.7. Idempotency-Key — защита от дублирования при нестабильной связи
При нестабильном соединении POS может отправить запрос (создание чека), получить таймаут и не знать, прошёл ли запрос. При повторной отправке без защиты — дубликат чека.
Решение: заголовок Idempotency-Key на всех мутирующих запросах POS → Касса.
┌─────────────────────────────────────────┐│ idempotency_keys │├─────────────┬───────────────────────────┤│ PK │ key (UUID из заголовка) │├─────────────┼───────────────────────────┤│ │ response_status (INT) ││ │ response_body (JSON) ││ │ created_at │└─────────────┴───────────────────────────┘Логика (NestJS IdempotencyInterceptor, зарегистрирован глобально через app.useGlobalInterceptors(...) в main.ts кассы):
- POS отправляет
POST /api/receiptsс заголовкомIdempotency-Key: <uuid> - Interceptor пропускает
GET-запросы; дляPOST/PUT/PATCH/DELETEпроверяет: есть ли ключ вidempotency_keys?- Есть → вернуть сохранённые
response_statusиresponse_body(не выполнять повторно) - Нет → выполнить запрос, сохранить результат с ключом, вернуть ответ
- Есть → вернуть сохранённые
- TTL-очистка: cron каждые 24 часа удаляет записи старше 48 часов
Race condition: key имеет UNIQUE constraint в БД. Если два параллельных запроса с одним ключом пришли одновременно:
- Первый INSERT выигрывает
- Второй ловит
UniqueConstraintViolation→ делает SELECT + ждёт пока первый запишет результат → возвращает его
Важно:
- Применяется только к мутирующим эндпоинтам (POST/PUT/PATCH/DELETE), не к GET
- POS генерирует
Idempotency-Keyпри созданииpending_request, а не при отправке — так повторная отправка использует тот же ключ - Ключ — UUID v4, генерируется на стороне POS
2.8. WebSocket — real-time push для POS
После авторизации и первичной синхронизации POS устанавливает WebSocket-соединение с кассой (Socket.IO, namespace из process.env.WS_NAMESPACE). Это основной канал получения обновлений.
POS ──► WS CONNECT wss://kassa/<namespace> handshake.auth.token = <JWT терминала> (или ?token=... в query) ◄── событие 'connected'
Касса при получении нового события из Kafka:1. Consumer обрабатывает событие последовательно (inbound_events + upsert) в транзакции2. Только если status = APPLIED и syncToPos = true:3. TerminalConnectionService.pushInboxEvents() рассылает через broadcastToSubdivision('inbox', payload) каждому подразделению из subdivisionIds события (индекс subdivisionIndex в памяти)4. Пушит событие 'inbox' через WebSocket в том же формате, что и /api/inboxИмена WebSocket-событий: inbox (push событий с inbound_events), connected (приветствие после handshake), ping/pong (keep-alive), error, message. На стороне клиента POS подписывается на inbox и вызывает ту же processEvents(), что и для /api/inbox.
Важно: Consumer обрабатывает события последовательно (single-threaded). Это гарантирует, что revision назначаются в порядке обработки и WebSocket push отправляется только после коммита в БД. Без этого возможен пропуск событий при параллельной обработке.
Формат WS-сообщения идентичен ответу /api/inbox:
{ "events": [{ "event_type": "...", "entity_type": "...", "action": "...", "payload": {...}, "revision": 543 }], "last_revision": 543}POS использует одну и ту же функцию для обработки событий из WebSocket и из /api/inbox. WebSocket — основной канал, /api/inbox — recovery после обрыва.
2.9. Авторизация POS
Касса авторизует терминал собственным эндпоинтом:
POS ──► POST /api/auth/terminal body: { code: string, password: string } ◄── { accessToken, terminalId, subdivisionIds, corporationId }- Касса ищет терминал по
codeв таблицеpos_terminals(поля:id,code(unique),name,passwordHash,isActive,subdivisionIdFK) и сверяетpasswordчерез bcrypt (с защитой от timing-атак) - В payload JWT кладёт
{ terminalId, subdivisionIds, corporationId }— в БД у терминала одинsubdivisionId(FK →Subdivision), в JWT он оборачивается в массивsubdivisionIdsдля единообразия с форматомinbound_events.subdivisionIds expiresIn/refresh-token не возвращаются — access-токен без явного TTL в ответе (TTL задаётся конфигом JWT)- Этот JWT используется для
/api/inbox(Bearer) и WebSocket (handshake.auth.tokenили?token=...) subdivisionIdsиз JWT — источник истины для фильтрации событий в/api/inboxи для маршрутизации WebSocket push
3. POS (Point of Sale — Терминал)
Клиентское приложение на кассовом терминале с поддержкой offline-first работы. POS хранит локальную SQLite-копию справочников офиса (better-sqlite3 внутри userData) и применяет к ней входящие события самостоятельно — DDL и маппинг event_type → SQL живут внутри бандла POS, касса схему по сети не отдаёт.
3.1. Инициализация — локальная схема из бандла
При первом запуске POS применяет DDL из локального реестра (electron/modules/sync/registry.ts) — CREATE TABLE IF NOT EXISTS + CREATE INDEX IF NOT EXISTS — и проставляет PRAGMA user_version = MIGRATIONS.length. Существующие установки прокручивают MIGRATIONS (append-only массив ALTER/CREATE) начиная с текущего user_version (см. electron/shared/db/schema.ts). Доставка новых полей/таблиц — через релиз нового бандла POS (раздел 4).
3.2. Приём событий — два транспорта, один обработчик
POS получает данные через два канала:
- WebSocket — основной, real-time push от кассы
/api/inboxpolling — recovery после обрыва WebSocket или при первичной синхронизации
Оба канала используют одинаковый формат и одну функцию обработки:
function processEvents(data) { for (event of data.events) { // event_type приходит готовым из кассы — POS не собирает его из частей const sql = EVENT_SQL[event.event_type]; // локальный маппинг из registry.ts if (sql) execute(sql, event.payload); // db.prepare(sql).run(payload) // Неизвестные event_type (нет ключа в ALLOWED_EVENT_TYPES) — игнорируются } updateLastSyncedRevision(data.last_revision);}
// Источник 1: WebSocket (основной, real-time)ws.onMessage(msg => processEvents(msg));
// Источник 2: /api/inbox (recovery после обрыва)async function recover() { const rev = getLastSyncedRevision(); const data = await fetch("/api/inbox?last_synced_revision=" + rev); processEvents(data); if (data.has_more) recover();}3.3. Генерация событий (Terminal → Касса)
POS генерирует бизнес-события (создание чеков, закрытие смен и т.д.) и отправляет их напрямую через REST-эндпоинты кассы — каждая бизнес-операция имеет свой эндпоинт на кассе:
POS Касса │ │ │ POST /api/receipts ────────────► │ (создание чека) │ Idempotency-Key: <uuid> │ │ POST /api/cash-shifts/close ──► │ (закрытие смены) │ Idempotency-Key: <uuid> │ │ │ │ │ Касса в транзакции: │ │ 1. Бизнес-логика (INSERT/UPDATE) │ │ 2. INSERT outbox_events → KafkaPOS не хранит outbox_events. Вместо этого, при отсутствии сети POS откладывает вызовы REST-эндпоинтов в локальную очередь:
┌─────────────────────────────────────────┐│ pending_requests (POS) │├─────────────┬───────────────────────────┤│ PK │ UniqueID │├─────────────┼───────────────────────────┤│ Request │ method ││ metadata │ url ││ │ body (JSON) ││ │ idempotency_key (UUID) │ ◄── генерируется при создании записи├─────────────┼───────────────────────────┤│ Статус │ status ││ │ last_error ││ │ attempts ││ │ created_at │└─────────────┴───────────────────────────┘
Статусы: PENDING ──► SENDING ──► SENT / FAILEDПри восстановлении сети POS отправляет отложенные запросы в порядке создания. idempotency_key генерируется при создании записи, что гарантирует что повторная отправка использует тот же ключ.
3.4. Service Worker — синхронизация и офлайн
Service Worker управляет синхронизацией:
┌───────────────────────────────────────────────────────────────┐│ Service Worker ││ ││ СЦЕНАРИЙ 1: Первый вход / первичная инициализация ││ ───────────────────────────────────────────────── ││ Применить локальный DDL из бандла POS (registry.ts) ││ + MIGRATIONS по PRAGMA user_version ││ ││ GET /api/inbox (без параметров) ││ → Полная загрузка всех справочников из БД кассы (батчами) ││ → Применение EVENT_SQL для каждого события ││ → Сохранение last_synced_revision из ответа ││ ││ ││ СЦЕНАРИЙ 2: WebSocket (основной режим) ││ ───────────────────────────────────────────────── ││ WS CONNECT wss://kassa/ws ││ → Получение real-time событий ││ → Применение EVENT_SQL (та же функция processEvents) ││ → Обновление last_synced_revision ││ ││ ││ СЦЕНАРИЙ 3: Recovery после обрыва WebSocket ││ ───────────────────────────────────────────────── ││ ││ ┌──────────────────────────────────┐ ││ │ GET /api/inbox? │ ││ │ last_synced_revision=N │ ◄── из БД кассы ││ └──────────────┬───────────────────┘ ││ │ ││ ▼ ││ Применение EVENT_SQL ││ Обновление last_synced_revision ││ ││ ┌──────────────────────────────────┐ ││ │ Отправка отложенных │ ││ │ pending_requests → касса │ ││ │ (с Idempotency-Key) │ ││ │ После успеха: пометка │ ││ │ статуса SENT │ ││ └──────────────────────────────────┘ ││ ││ WS RECONNECT │└───────────────────────────────────────────────────────────────┘3.5. Локальная SQLite POS — служебные и доменные таблицы
DDL живёт внутри бандла POS (electron/shared/db/schema.ts, electron/modules/sync/registry.ts) — касса не отдаёт схему по сети, POS применяет её сам при первом запуске.
Служебные таблицы (создаются первыми)
┌──────────────────────────────┐│ local_settings │ ◄── key/value: last_synced_revision, last_connected_at и пр.├──────┬───────────────────────┤│ PK │ key (TEXT) ││ │ value (TEXT NOT NULL) │└──────┴───────────────────────┘
┌─────────────────────────────────────────────────────────────────┐│ pending_requests │ ◄── очередь отложенных POST/PUT/PATCH/DELETE├──────┬──────────────────────────────────────────────────────────┤│ PK │ id (TEXT) ││ │ method, url, body, headers (TEXT/JSON) ││ │ idempotency_key (TEXT NOT NULL) ││ │ status TEXT NOT NULL DEFAULT 'PENDING' ││ │ attempts INTEGER NOT NULL DEFAULT 0 ││ │ last_error TEXT ││ │ created_at INTEGER NOT NULL DEFAULT (strftime('%s','now'))│└──────┴──────────────────────────────────────────────────────────┘ + INDEX idx_pending_status_created (status, created_at)local_settings хранит как минимум last_synced_revision (обновляется из last_revision в ответе /api/inbox и WebSocket-сообщений) и last_connected_at. Используется как key/value, чтобы добавление новой настройки не требовало миграции схемы.
Таблицы синхронизируемых сущностей
Декларативно собираются из ENTITY_REGISTRY (electron/modules/sync/registry.ts). Каждая запись реестра содержит ddl, опциональные indexes, readSql и опциональный event (suffix + UPSERT/DELETE SQL + items SQL для вложенных массивов). Производные ENTITY_DDL/EVENT_SQL/ITEMS_SQL пересобираются автоматически. На загрузке модуля registry.ts сверяет каждый сгенерированный event_type с каталогом EVENT_TYPES из @locali/office-contracts — рассогласование падает рано, а не на первом «event_type не распознан» из inbox.
Текущие таблицы: employees, employee_positions, positions, subdivisions, sales_points, payment_methods, nomenclature_groups, pos_terminals, nomenclatures, modifiers, tech_cards, tech_card_ingredients, warehouse_stocks. Все DDL — через CREATE TABLE IF NOT EXISTS + CREATE INDEX IF NOT EXISTS (повторный запуск не падает).
Миграции и user_version
POS использует PRAGMA user_version для версионирования схемы:
SCHEMA(в registry.ts) — целевое состояние. Свежие установки применяют её сразу и проставляютuser_version = MIGRATIONS.length.MIGRATIONS(schema.ts) — append-only массив. Каждый элемент — один ALTER/CREATE/UPDATE для переходаN → N+1, идемпотентный в своём шаге.- Прод-кассы прокручивают миграции по порядку, начиная с текущего
user_version. После успехаuser_versionинкрементируется — повторно шаг не выполнится.
При добавлении/изменении поля в офисной сущности правится ddl в ENTITY_REGISTRY (для свежих установок) и добавляется новый элемент в MIGRATIONS (для существующих POS). Касса в этом не участвует — новый DDL доезжает до терминалов только с новым бандлом POS (см. раздел 4).
Применение входящих событий
POS получает событие через /api/inbox или WebSocket → берёт event.event_type → ищет SQL в локальной мапе EVENT_SQL → выполняет db.prepare(sql).run(event.payload). Параметры именованные (:id, :version, …) совпадают с полями payload. Если у события есть itemsSql (например, ингредиенты техкарты) — массив из payload[arrayField] вставляется отдельным prepared-statement в транзакции с основным UPSERT-ом.
Неизвестные event_type (нет ключа в ALLOWED_EVENT_TYPES) игнорируются — POS «забывает» события, для которых не подготовлен handler в текущей версии бандла.
4. Доставка клиента POS (auto-update)
Клиентское приложение POS — Electron-сборка под Windows (NSIS). У неё два независимых канала обновлений: .exe едет через electron-updater (медленный канал), renderer-бандл — отдельным zip (быстрый канал). Оба используют один S3-bucket в Yandex Object Storage, но разные префиксы.
4.1. Desktop-канал: electron-updater + Yandex S3
Содержимое канала — собранный NSIS-инсталлятор (.exe ~80 МБ): main + preload + IPC + DDL + SQL + embedded dist/. Релизы редкие — раз в спринт или при изменении IPC-контракта/схемы. Реализация: electron/modules/updater/desktop.ts (тикет LOCALIKASSA-263).
S3 (locali-kassa-updates/desktop/) POS (Electron-main) │ │ │ latest.yml │ 1. initDesktopUpdater(win) — на старте + раз в час │ Kassa Setup X.Y.Z.exe │ (TIMING.desktopUpdateCheckMs = 60 * 60 * 1000) │ *.blockmap │ 2. autoUpdater.checkForUpdates() │ │ 3. update-available → IPC «updater:available» │ ◄────── GET latest.yml ──────────────────── │ │ ────── update info ─────────────────────► │ 4. autoDownload = true (фон, blockmap-diff) │ ◄────── GET .exe + .blockmap (diff) ────── │ 5. update-downloaded → IPC «updater:downloaded» │ │ 6. autoInstallOnAppQuit = true: применится │ │ при следующем штатном выходе. │ │ Кнопка «Перезапустить» в UI зовёт IPC │ │ «updater:quit-and-install» → autoUpdater.quitAndInstall()- Bucket:
locali-kassa-updates/desktop/в Yandex Cloud, публичный read. electron-builderpublish:provider: generic(read-only) + ручнойaws s3 syncна release. CI-jobrelease-desktopсобирает.exeчерез electron-builder и публикует*.exe+*.blockmap+latest.ymlна тэгv*.*.*.- В dev (
!app.isPackaged) updater полностью отключён —initDesktopUpdaterвозвращается сразу, чтобы dev-машина не пыталась обновляться. - NSIS-only: portable-exe через
electron-updaterне обновляется.
4.2. Web-канал: manifest + zip + sha256
Renderer-бандл (dist/ от Vite ~1-2 МБ) — отдельный канал, чтобы гонять UI-фиксы без переустановки клиента и без пересборки electron-приложения. Реализация: electron/modules/updater/web.ts (тикет LOCALIKASSA-264).
S3 (locali-kassa-updates/web/) POS (Electron-main) │ │ │ manifest.json (Cache-Control: no-cache) │ 1. checkAndDownload(WEB_MANIFEST_URL) перед createWindow() │ <version>/bundle.zip (immutable) │ 2. GET manifest → { version, bundleUrl, sha256, compatMain? } │ │ 3. semver.satisfies(app.getVersion(), compatMain) — если нет, skip │ ◄────── GET manifest.json ───────────────── │ 4. installed.version === manifest.version — если да, skip │ ────── { version, bundleUrl, sha256 } ────► │ 5. GET zip → проверить sha256 │ ◄────── GET <version>/bundle.zip ────────── │ 6. extract-zip → userData/web/<version>.staging.<pid>/ │ │ 7. fs.renameSync → userData/web/<version>/ (атомарно) │ │ 8. write userData/web/current.json через .tmp + rename │ │ 9. BrowserWindow.loadFile(resolveIndexPath(embedded)) │ │ — установленный bundle, либо embedded dist из .asarСтруктура bucket:
locali-kassa-updates/web/├── manifest.json ← Cache-Control: no-cache (постоянный URL, всегда указывает на актуальную версию)├── a3f2d91/bundle.zip ← одна папка на версию (sha-хэш коммита)├── b14ef8c/bundle.zip ← старые версии остаются — rollback правкой manifest.json└── ...Особенности:
compatMain: semver-диапазон desktop-версий, при которых bundle совместим (защита от рассинхрона main↔renderer). Любое изменение IPC-контракта (src/types/electron.d.ts) или бизнес-DTO → бамп major вpackage.json/version+ новыйcompatMain=">=<major>". Старые.exeтогда не подтянут несовместимый bundle.- sha256 обязателен: без него подмена zip через MitM теоретически возможна (bucket публичный). С sha256 атакующему нужно подменить и zip, и manifest одновременно.
- Атомарный install: распаковка в staging-директорию с PID-суффиксом →
renameSyncна финальный путь. Pointercurrent.jsonтоже пишется через.tmp+ rename — никаких полузаписанных состояний. - Все ошибки глотаются. Если manifest битый / sha не совпал / нет сети —
resolveIndexPathотдаёт предыдущий установленный bundle, или embeddeddist/из.asar. Касса/POS не падают на старте из-за проблем CDN. - Триггер CI: каждый merge в
main, который меняет renderer-код (src/**,public/**, конфиги сборки). Изменения вelectron/**требуют desktop-релиза — web-job на них не срабатывает.
4.3. Разделение каналов
| Что меняется | Канал | Почему |
|---|---|---|
| Renderer-код (компоненты, стили, бизнес-логика UI) | web | дёшево и быстро, не требует переустановки |
| IPC-контракт, preload, main-процесс, DDL/SQL POS | desktop | в .exe, без него bundle несовместим (compatMain блокирует) |
Зависимости native-модулей (better-sqlite3 и т.п.) | desktop | модули скомпилированы в .exe под конкретный Electron ABI |
5. Сквозные потоки данных
5.1. Изменение номенклатуры (Офис → POS)
Офис Касса POS │ │ │ │ 1. Изменение сущности │ │ │ (version = 5) │ │ │ 2. Транзакция сервиса: │ │ │ ├─ UPDATE (version = 5) │ │ │ └─ INSERT outbox_events │ │ │ (payload включает │ │ │ subdivision_id) │ │ │ │ │ │ 3. Outbox Scheduler │ │ │ └─ Producer ──Kafka──► │ │ │ (CloudEvent с │ │ │ version: 5) │ │ │ │ 4. Consumer │ │ │ ├─ INSERT inbound_events │ │ │ │ (revision = 124, │ │ │ │ sync_to_pos = true, │ │ │ │ subdivision_ids[]) │ │ │ ├─ handler.handle(): │ │ │ │ Check version │ │ │ └─ Upsert payload │ │ │ │ │ │ 5a. WebSocket push ────────►│ (real-time, терминалам чьи │ │ { events: [...], │ subdivisionIds пересекаются │ │ last_revision: 124 } │ с subdivision_ids события) │ │ │ │ │ ИЛИ │ │ │ │ 5b. Polling GET /sync │ │ │ ?last_synced_revision=123 │ │ ◄──────────────────────────│ │ │ SELECT FROM inbound_events │ │ │ WHERE revision > 123 │ │ │ AND sync_to_pos = true │ │ │ AND subdivision_ids │ │ │ hasSome <terminal> │ │ │ ──────────────────────────►│ │ │ │ │ │ │ 6. Взять event_type из ответа │ │ │ 7. Найти SQL в локальном маппинге POS │ │ │ 8. Выполнить SQL в локальной БД │ │ │ 9. last_synced_revision = 1245.2. Создание чека (POS → Офис)
POS Касса Офис │ │ │ │ 1. Бизнес-операция │ │ │ │ │ │ 2. POST /api/receipts ────► │ │ │ Idempotency-Key: <uuid> │ │ │ (или pending_requests │ │ │ если офлайн) │ │ │ │ 3. Проверить Idempotency-Key│ │ │ Новый → выполнить: │ │ │ Transaction: │ │ │ ├─ INSERT receipts │ │ │ ├─ UPDATE cash_shifts │ │ │ └─ INSERT outbox_events │ │ │ Сохранить результат с Key│ │ │ │ │ │ 4. Outbox Scheduler │ │ │ └─ Producer ──Kafka──► │ │ │ │ 5. Consumer │ │ │ └─ Upsert by UUIDОдна REST-операция кассы может породить несколько outbox-событий в одной транзакции. Пример — POST /api/cash-shifts/:id/close (LOCALIKASSA-298): касса считает расхождение actualCashAmount против расчётного остатка и при превышении Corporation.cashShiftDiscrepancyThreshold дополнительно создаёт CashShiftDocument (CASH_IN/CASH_OUT) по системному DWT-типу — в итоге публикуются cash_shift.closed плюс опционально deposit_withdrawal_operation.performed (source: 'POS_AUTO_DISCREPANCY'). Детали — в IMPLEMENT.md кассы (§ 8.6).
5.3. Офлайн-сценарий
POS (офлайн) Касса Офис │ │ │ │ Нет сети │ │ │ Работает автономно │ │ │ Запросы копятся в │ │ │ pending_requests │ │ │ (с Idempotency-Key) │ │ │ │ │ │ ── Сеть восстановлена ── │ │ │ │ │ │ ШАГ 1: Догнать пропущенное │ │ │ GET /api/inbox? │ │ │ last_synced_revision=124 ►│ │ │ │ SELECT FROM │ │ │ inbound_events │ │ │ WHERE revision > 124 │ │ │ AND sync_to_pos=true│ │ │ AND subdivision_ids │ │ │ hasSome <terminal>│ │ ◄──────────────────────────│ │ │ │ │ │ Применение SQL из маппинга │ │ │ last_synced_revision = 189 │ │ │ │ │ │ ШАГ 2: Отправить отложенные│ │ │ POST (с Idempotency-Key)──►│ Обработка │ │ Пометка SENT │ (дубликаты │ │ │ отсекаются по Key) │ │ │ │ │ ШАГ 3: Восстановить WS │ │ │ WS CONNECT ────────────────►│ │ │ ◄── WS CONNECTED │ │6. Ключевые архитектурные паттерны
-
Transactional Outbox — офис и касса используют outbox-паттерн: бизнес-операция и запись события выполняются в одной транзакции, что гарантирует at-least-once доставку без распределённых транзакций. POS использует очередь отложенных REST-запросов (
pending_requests). -
Idempotent Consumer — проверка
versionна стороне кассы предотвращает применение устаревших или повторных событий. Еслиversionвходящего события ≤ текущей в БД — событие получает статус SKIPPED. -
Idempotency-Key — все мутирующие запросы POS → Касса содержат заголовок
Idempotency-Key. Касса хранит ключ + результат 48 часов. Повторный запрос с тем же ключом возвращает сохранённый ответ без повторного выполнения. Критично при нестабильной связи. -
Версионирование (version) — каждая синхронизируемая сущность в офисе хранит поле
version, инкрементируемое при каждом изменении. Передаётся в CloudEvents и в/api/inbox. Используется для идемпотентной обработки на кассе и POS. -
Revision-based синхронизация —
inbound_eventsна кассе имеет автоинкрементныйrevision. POS запрашивает изменения поrevision > last_synced_revision. Это позволяет POS догонять пропущенные события после офлайна. -
CloudEvents — стандартизированный формат сообщений в Kafka. Каждое сообщение содержит метаданные (specversion, type, source, id, time, subject) и данные (
data.version+data.payload). -
Локальная схема POS (in-bundle DDL) — POS носит DDL локальной SQLite и маппинг
event_type → SQLвнутри собственного бандла (registry.ts+schema.ts). Касса схему по сети не отдаёт. Доставка новых полей/таблиц — через релиз нового бандла POS (web- или desktop-канал, см. раздел 4); версионирование —PRAGMA user_version+ append-onlyMIGRATIONS. -
Касса — единственная точка контакта для POS — POS общается только с кассой.
/api/inboxотдаётся из БД кассы (не проксируется в офис). Операции (чеки, смены) отправляются на кассу. Офис не знает про POS. -
Dual transport, single handler — POS получает события через два канала (WebSocket и
/api/inbox), но обрабатывает их одной функциейprocessEvents(). WebSocket — основной real-time канал,/api/inbox— recovery после обрыва. -
Offline-first (POS) — POS работает автономно, накапливая REST-запросы в
pending_requests(сIdempotency-Key). При восстановлении связи: 1) догнать через/api/inbox, 2) отправить отложенные запросы, 3) восстановить WebSocket. -
Конвенция
event_type— полный тип события формируется по формулеprefix + entity_type + "." + action. Префикс зависит от источника:core.company.office.(офис),core.company.kassa.(касса). Сами префиксы и каталог известныхevent_typeживут в общем пакете@locali/office-contracts(OFFICE_EVENT_TYPE_PREFIX,KASSA_EVENT_TYPE_PREFIX,EVENT_TYPES.*). Конвенция едина для Kafka-сообщений и ответа/api/inbox. -
Двухканальный auto-update POS — клиент POS обновляется через два независимых канала S3: desktop (
.exe+electron-updater, редкие релизы) и web (renderer-bundle.zip+ sha256 +compatMain, быстрые UI-фиксы). Разделение в разделе 4. -
Фильтрация событий для POS — не все события из Kafka предназначены для POS. Поле
sync_to_posвinbound_eventsопределяет, нужно ли событие терминалам./api/inboxи WebSocket push отдают только события сsync_to_pos = true. -
Роутинг по
subdivision_ids— офис передаётdata.subdivision_ids(массив) в CloudEvent каждой сущности. Касса сохраняет массивsubdivisionIdsвinbound_eventsи использует Prisma-операторhasSomeдля фильтрации: в/api/inboxи WebSocket push терминал получает только события, чейsubdivision_idsпересекается с подразделениями терминала (массив из JWT). -
Авторизация через JWT —
corporationIdне передаётся в эндпоинтах, а извлекается из JWT. Супер-админ (User.isAdmin) через эндпоинтswitch-corporationполучает новый JWT с другимcorporationIdи работает с данными нужной корпорации через те же эндпоинты. -
Kafka partition-key per aggregate (LOCALIOFFICE-1003) — каждое outbox-событие публикуется в Kafka с явным ключом по наименьшему осмысленному агрегату (
entityIdилиpayload.employeeId/warehouseId). Это даёт consumer-у кассы FIFO внутри одного агрегата без сравнения timestamp’ов и оверхеда на re-ordering. Стратегия — в §1.8. Число партиций топиковoffice.events/kassa.eventsнельзя менять без миграции на новый топик: после resize старые ключи попадут в другие партиции и нарушится FIFO.