Перейти к содержимому

Архитектура распределённой системы синхронизации данных (Офис ↔ Касса ↔ POS)

Источник истины. Единый кросс-системный документ архитектуры Locali (свод office + kassa). Не копировать в репозитории сервисов — ссылаться отсюда. Ссылки на код даны относительно репозитория соответствующего сервиса.

Система состоит из трёх основных компонентов, взаимодействующих через Kafka и REST API, с использованием Outbox-паттерна, CloudEvents-протокола и revision/version-based синхронизации.

Общая схема взаимодействия

┌──────────┐ Kafka ┌──────────┐ REST API / WebSocket ┌──────────┐
│ │ ────────────────────► │ │ ────────────────────────────►│ │
│ ОФИС │ nomenclature, │ КАССА │ /api/inbox, │ POS │
│ │ employee │ (Kassa) │ WebSocket push │(Терминал)│
│ │ ◄──────────────────── │ │ ◄────────────────────────────│ │
└──────────┘ Kafka └──────────┘ POST /api/receipts и др. └──────────┘
│ Касса — единственная точка контакта для POS.
│ /api/inbox отдаётся из БД кассы (не проксируется в офис).
│ DDL локальной SQLite и маппинг event_type → SQL живут
│ внутри бандла POS (registry.ts) — касса схему по сети не отдаёт.

Авторизация и корпорации

corporationId не передаётся в эндпоинты — берётся из JWT текущего пользователя. У User есть флаг isAdmin: супер-админу доступен эндпоинт switch-corporation, который выдаёт новый JWT с нужным corporationId, после чего админ получает данные другой корпорации через те же эндпоинты.


Конвенция имён Kafka-топиков

Имена топиков — иерархические, разделитель — точка. Базовый шаблон: <сервис>.events, при необходимости — <сервис>.events.<подтип>. Имена топиков, на которые опирается приложение:

  • office.events — исходящие события офиса (константа OFFICE_OUTBOX_TOPIC в src/common/ports/outbox.port.ts).
  • kassa.events — входящие события кассы для офиса (env KAFKA_INBOUND_TOPIC, дефолт — kassa.events).

Запрещено:

  • Использовать дефис (-) как разделитель (office-events, kassa-events и т.п.).
  • Смешивать . и _ в одном имени.

Почему именно так. Kafka в JMX-метриках заменяет и ., и _ на _, поэтому смесь разделителей в разных топиках даёт коллизию имён метрик. Дефис как разделитель уровней допустим в Kafka, но конвенция в проекте — точка, чтобы имена единообразно «сворачивались» в иерархию (<сервис>.events.<подтип>) и не пересекались с дефисами внутри <сервис> или <подтип>.

При добавлении нового топика — следовать этой схеме. Расхождение имени топика на стороне продюсера и консьюмера (например, продюсер пишет в service-events, консьюмер слушает service.events) приводит к молчаливой потере событий — менять имя нужно скоординированно с продюсером.

Конвенция CloudEvents source

Поле source в CloudEvents-конверте идентифицирует сервис-продюсер. Шаблон — <сервис>-service (дефис как разделитель — это идентификатор сервиса, не имя топика):

  • office-service — события, опубликованные офисом (литерал зашит в CloudEventBuildersrc/common/integration/cloud-event.builder.ts).
  • kassa-service — события, опубликованные кассой.

Значение source используется консьюмером для фильтрации/маршрутизации и должно меняться скоординированно между продюсером и консьюмером.


1. ОФИС (Office)

Офис — источник истины для справочных данных (номенклатура, сотрудники и т.д.). Он генерирует события при изменении данных и публикует их в Kafka через Outbox-паттерн.

1.1. Генерация событий

Application-сервис пишет в outbox_events напрямую в той же транзакции, что и бизнес-операцию. Не через EventListener — это гарантирует атомарность:

const entity = await this.transactionManager.run(async (tx) => {
const updated = await this.repository.update(id, { ...data, version: current.version + 1 }, tx);
await this.outboxRepository.create({
eventType: 'core.company.office.nomenclature.updated',
entityType: 'nomenclature',
entityId: id,
action: 'updated',
payload: OutboxMapper.toPayload(updated),
status: OutboxStatus.PENDING,
}, tx);
return updated;
});
// Доменное событие — ПОСЛЕ транзакции (для внутренних обработчиков, не для outbox)
await this.eventEmitter.emitAsync('nomenclature.updated', ...);

Важно: EventEmitter2.emitAsync() вызывается после transactionManager.run() — для внутренних подписчиков (инвалидация кэша, CQRS-саги и т.д.). Outbox-запись делается внутри транзакции сервисом, не слушателем.

1.2. Таблица outbox_events

┌─────────────────────────────────────────┐
│ outbox_events │
├─────────────┬───────────────────────────┤
│ PK │ UniqueID │
├─────────────┼───────────────────────────┤
│ Event │ event_type │
│ metadata │ entity_type │
│ │ entity_id │
├─────────────┼───────────────────────────┤
│ Полный │ payload (JSON) │
│ payload │ status │
│ Статус │ last_error │
│ публикации │ attempts │
│ │ created_at │
│ │ published_at │
└─────────────┴───────────────────────────┘

1.3. Конвенция именования event_type

Полный event_type формируется по конвенции: prefix + entity_type + . + action.

Каждый источник имеет свой префикс:

  • Офисcore.company.office.
  • Кассаcore.company.kassa.

Примеры:

  • entity_type: "nomenclature", action: "updated"core.company.office.nomenclature.updated
  • entity_type: "employee", action: "created"core.company.office.employee.created
  • entity_type: "receipt", action: "created"core.company.kassa.receipt.created
  • entity_type: "cash_shift", action: "closed"core.company.kassa.cash_shift.closed
  • entity_type: "attendance", action: "opened"core.company.kassa.attendance.opened
  • entity_type: "attendance", action: "closed"core.company.kassa.attendance.closed

Эта конвенция едина для:

  • Kafka-сообщений (поле type в CloudEvents)
  • Ответа /api/inbox (касса отдаёт event_type готовым, POS использует его как ключ роутинга в локальный маппинг handler-ов)

Префиксы (OFFICE_EVENT_TYPE_PREFIX, KASSA_EVENT_TYPE_PREFIX) и каталог известных event_type (EVENT_TYPES.*) — в пакете @locali/office-contracts. Касса импортирует константы из пакета, у себя локально префиксные строки не объявляет. Офис фиксирует канонические значения в src/common/ports/outbox.port.ts (OFFICE_EVENT_TYPE_PREFIX, OFFICE_OUTBOX_TOPIC).

Миграция: текущий код использует устаревший формат event names (organization.subdivision.changed, accounting.deposit-withdrawal-type.changed). При переходе на Outbox-паттерн все event_type должны быть приведены к конвенции выше. entity_typesnake_case, action — конкретное действие (created/updated/deleted), не changed.

1.4. Версионирование сущностей (version)

Каждая синхронизируемая сущность в офисе хранит поле version — версию сущности, которая увеличивается при каждом изменении:

┌────────────────────┐
│ subdivisions │ (пример любой синхронизируемой сущности)
├──────┬─────────────┤
│ PK │ UniqueID │
├──────┼─────────────┤
│ │ ... │ (бизнес-поля)
│ │ version │ ◄── версия сущности (инкрементируется при каждом изменении)
└──────┴─────────────┘

Это поле:

  • Хранится внутри payload сущности. В CloudEvents-сообщениях извлекается из payload на уровень data.version (не дублируется — в data.payload поле version не повторяется)
  • Используется для идемпотентной обработки на стороне получателя (касса): если входящий version ≤ текущего в БД кассы — событие пропускается (SKIPPED)

Помимо version, офис передаёт в CloudEvent data.subdivision_idsмассив ID подразделений, которым принадлежит сущность (одна сущность может относиться к нескольким подразделениям). Касса сохраняет subdivisionIds: String[] в inbound_events и использует массив для роутинга события терминалам (см. разделы 2.2, 2.5, 2.8). Поле передаётся на верхнем уровне data (рядом с version и payload), а не внутри payload.

1.5. Outbox-паттерн (публикация в Kafka)

outbox_events ──► Outbox Scheduler ──► Producer ──► Kafka broker
(Topic: office.events)
Жизненный цикл статуса:
PENDING ──► PUBLISHING ──► PUBLISHED
├──► PENDING (retry с exponential backoff, attempts++)
└──► FAILED (терминал: attempts >= MAX_ATTEMPTS)

Outbox Scheduler периодически опрашивает outbox_events, атомарно claim-ит PENDING-записи (UPDATE ... WHERE status='PENDING' RETURNING attempts — при нескольких инстансах publisher-а событие возьмёт ровно один), отправляет через publishOrThrow в Kafka. При успехе — PUBLISHED; при ошибке конкретного события — возврат в PENDING с exponential backoff, после MAX_ATTEMPTS — терминальный FAILED. Ошибки уровня соединения с брокером попытку не сжигают: событие возвращается в PENDING без учёта attempt, тик прерывается. При KAFKA_ENABLED != true публикация не запускается — события остаются в PENDING. Зависшие PUBLISHING-записи раз в минуту возвращает в PENDING cron resetStuckPublishing (исчерпавшие лимит — сразу в FAILED).

FIFO-гейт по агрегату (LOCALIOFFICE-1109, К2). Выборка PENDING-батча исключает событие, если у того же partition_key существует более раннее (по created_at, tie-break id) НЕопубликованное событие (NOT EXISTS в findPendingBatch, partial-индекс outbox_events_unpublished_partition_key_idx). Агрегат публикуется строго в порядке создания или стоит целиком: backoff/FAILED «головы» блокирует «хвост», ретраи не переупорядочивают события внутри Kafka-партиции. Тик публикатора дополнительно сериализован между репликами через Postgres advisory lock (pg_try_advisory_xact_lock, фиксированный ключ; занятый лок — тик пропускается).

Admin replay (LOCALIOFFICE-1109, К8). FAILED outbox-события и FAILED/DEAD inbound-записи возвращаются в работу без ручного SQL: GET /api/admin/sync/{outbox|inbound}/failed, POST .../failed/:id/requeue, POST .../failed/requeue-by-entity-type (только для администраторов). Метрики синхронизации: гейджи office_outbox_events / office_inbound_events (бэклог по статусам), office_outbox_stalled_aggregates (агрегаты, чьё старейшее неопубликованное событие старше 60 с), office_outbox_oldest_pending_age_seconds / office_inbox_oldest_pending_age_seconds (возраст старейшей необработанной записи), counter office_contract_schema_drift_total (входящий CloudEvent объявил dataschema с версией @locali/office-contracts новее локальной; сам исходящий dataschema пишет publisher).

1.6. Приём событий (Consumer)

Офис имеет Consumer (src/kassa-sync/), получающий события из Kafka-топика kassa.events и применяющий их к офисным сущностям через таблицу inbound_events (идемпотентность по (entityType, entityId, kafkaEventId)). «Битые» сообщения (не-JSON, невалидный envelope: не-UUID id/subject, отсутствие data, переполнение полей) consumer пропускает с error-логом и коммитом offset-а — они не должны блокировать партицию; инфраструктурные ошибки (БД недоступна при INSERT) пробрасываются — offset не коммитится, Kafka повторяет доставку, событие не теряется. BullMQ-очередь kassa-inbound обрабатывает PENDING-записи: KassaInboundProcessor в одной транзакции вызывает KassaEventDispatcher.apply(record, tx) и inboundRepo.markFinalized(...). На исключение — FAILED + nextRetryAt, забор fallback-cron’ом. Событие неизвестного типа (рассинхрон версий касса/офис при деплое) не финализируется как SKIPPED, а откладывается (deferRetry: возврат в PENDING, nextRetryAt +1 час, attempts сбрасываются) — применится после деплоя обработчика. Исключение — типы, которые офис осознанно не потребляет (IGNORED_KASSA_EVENT_TYPES в KassaEventDispatcher: легаси receipt.created/receipt.updated вне контракта): они финализируются терминальным SKIPPED сразу, иначе вечно держали бы алерт возраста inbound-бэклога (LOCALIOFFICE-1111); CI-проверка контрактов требует, чтобы каждый kassa-тип пакета был либо обработан, либо явно перечислен в этом списке. Ожидание «родителя» при out-of-order доставке (handler бросил DependencyMissingError: например, receipt.fiscalized раньше order.*, receipt.returned раньше оригинала) attempts не сжигает — запись откладывается с растущей задержкой и уходит в терминальный DEAD только когда её возраст превысил 24 часа (LOCALIOFFICE-1109, К3); обычные ошибки — по-прежнему 5 попыток → DEAD.

Реестр обработчиков (src/kassa-sync/application/handlers/kassa-event.dispatcher.ts):

Event typeНазначениеРеализация
core.company.kassa.cash_shift.opened/closedUPSERT кассовой смены (владелец смены в офисе — Subdivision, см. LOCALIOFFICE-982; posTerminalId payload-а сохраняется как audit-snapshot, subdivisionId резолвится из БД-снэпшота терминала после cross-tenant guard. Partial unique cash_shifts_one_open_per_subdivision гарантирует не более одной OPEN-смены на подразделение.)ACL (pos/application/event-handlers/cash-shift-sync.handler.ts, интерфейс kassa-sync/domain/acl/cash-shift-inbound.acl.ts)
core.company.kassa.order.created/updated/cancelled/closedSnapshot-upsert операционного заказа и его позиций (LOCALIOFFICE-1114): на каждый значимый переход касса публикует полный снимок заказа, все четыре типа применяются одинаково. Out-of-order-защита — по монотонно растущему version заказа кассы (PosOrder.kassaVersion); subdivisionId резолвится из БД-снэпшота терминала после cross-tenant guard; смена ещё не материализована — DependencyMissingError, inbound-механика ждёт. Статусы кассы шире офисных enum-ов — явный маппинг в handler-е (OPEN→DRAFT, SENT_TO_KITCHEN→COOKING, CLOSED→FISCALIZED; позиции: ORDERED→PENDING, SENT/IN_PROGRESS→COOKING, READY/SERVED→READY). Ранний локальный тип pos_order.snapshot (LOCALIOFFICE-1050) касса больше не публикует — обработчик удалён.ACL (pos/application/event-handlers/pos-order-sync.handler.ts, интерфейс kassa-sync/domain/acl/pos-order-inbound.acl.ts)
core.company.kassa.payment.createdUPSERT платежаACL (pos/application/event-handlers/pos-payment-sync.handler.ts, интерфейс kassa-sync/domain/acl/payment-inbound.acl.ts)
core.company.kassa.attendance.opened/closedДоменная логика staffing (audit, расчёт, outbox)ACL (staffing/application/event-handlers/attendance-sync.handler.ts, интерфейс kassa-sync/domain/acl/attendance-inbound.acl.ts)
core.company.kassa.receipt.fiscalizedЕдинственное событие чека: материализация PosReceipt с позициями + фискальные реквизиты (ФПД, № ФД, серийник ФН, рег. номер ККТ, snapshot реквизитов юрлица и точки продаж, опциональный контакт покупателя по 54-ФЗ, теги ФФД позиций) — LOCALIOFFICE-1052. Операционные поля (openedAt/waiterId/salesPointId) берутся из связанного PosOrder по orderId. Out-of-order (фискальное событие раньше cash_shift.opened или order.*) — DependencyMissingError, inbound-механика ждёт.ACL (pos/application/event-handlers/pos-receipt-sync.handler.ts, интерфейс kassa-sync/domain/acl/receipt-inbound.acl.ts)
core.company.kassa.cash_shift.fiscal_openedФискальное открытие смены: рег. номер ФН и № ФД отчёта об открытии — LOCALIOFFICE-1052.ACL (pos/application/event-handlers/cash-shift-sync.handler.ts, интерфейс kassa-sync/domain/acl/cash-shift-inbound.acl.ts)
core.company.kassa.cash_shift.fiscal_z_reportedZ-отчёт: № ФД отчёта о закрытии смены — LOCALIOFFICE-1052. Смена без zReportFdNumber при status = CLOSED считается «не закрытой фискально».ACL (pos/application/event-handlers/cash-shift-sync.handler.ts, интерфейс kassa-sync/domain/acl/cash-shift-inbound.acl.ts)
core.company.kassa.receipt.returnedUPSERT возвратного фискального документа в отдельную сущность PosReturnReceipt + PosReturnReceiptItem с обязательным per-item FK originalReceiptItemId — LOCALIOFFICE-1063 / LOCALIKASSA-312. Cross-tenant guard через originalReceipt.cashShift.corporationId; out-of-order (возврат раньше receipt.fiscalized оригинала) — handler выкидывает ошибку, inbound-механика ретраит. RevenueReportEntry со знаком минус строится on-demand через HTTP process-return-receipt/:id (паттерн идентичен продажам). Обратные проводки на закрытии смены подхватываются автоматически: SQL агрегата findCashShiftPaymentBreakdown UNION-ом объединяет sales (все pos_payments смены, после выпиливания discriminator-колонки operation_type) и returns из pos_return_receipts с пропорциональным распределением final_amount по методам оплаты оригинала.ACL (pos/application/event-handlers/pos-return-receipt-sync.handler.ts, интерфейс kassa-sync/domain/acl/return-receipt-inbound.acl.ts)

Все handler-ы живут у владельца агрегата в <module>/application/event-handlers/ и регистрируются через ACL-токен в KassaEventDispatcher (см. IMPLEMENT.md §9 «Inbound event-handlers»). На каждое успешно применённое inbound-событие handler возвращает DeferredDomainEvent[]EventEmitter2-события публикуются процессором после коммита inbound-транзакции, что открывает точку подписки @OnEvent для других модулей (accounting, inventory, reporting).

1.7. Формат сообщения (CloudEvents protocol)

Envelope собирается единым CloudEventBuilder (src/common/integration/cloud-event.builder.ts) — литералы specversion: '1.0', source: 'office-service', datacontenttype: 'application/json' зашиты в нём.

Префикс type (core.company.office) и имя Kafka-топика — каноничные константы OFFICE_EVENT_TYPE_PREFIX и OFFICE_OUTBOX_TOPIC в src/common/ports/outbox.port.ts. При смене конвенции править там, не в публикаторе.

id события генерируется адаптером (randomUUID) и переиспользуется как CloudEvent.id — стабильный идемпотентный ключ для consumer’а между ретраями.

{
"specversion": "1.0",
"type": "core.company.office.nomenclature.updated",
"source": "office-service",
"id": "uuid-event-id",
"time": "2028-02-25T10:00:00Z",
"subject": "uuid-nomenclature-id",
"datacontenttype": "application/json",
"data": {
"version": 5,
"subdivision_ids": ["uuid-subdivision-1", "uuid-subdivision-2"],
"payload": {
"id": "uuid-nomenclature-id",
"name": "Новый Цезарь",
"price": 550.00,
"is_available": true
// ... все поля сущности (version и subdivision_ids НЕ дублируются в payload)
}
}
}

Ключевые поля:

  • data.version — версия сущности. Используется для идемпотентной обработки: если входящий version ≤ текущего в БД — событие пропускается (SKIPPED)
  • data.subdivision_ids — массив ID подразделений, к которым относится сущность. Касса использует массив для роутинга события нужным терминалам
  • subject — ID изменённой сущности
  • type — полный тип события (домен + действие)

1.8. Стратегия Kafka partition-key (LOCALIOFFICE-1003)

Каждое outbox-событие публикуется в Kafka с явным partition-key. Сообщения с одинаковым ключом гарантированно попадают в одну партицию и читаются consumer-ом строго в порядке отправки. Без ключа KafkaJS распределяет сообщения round-robin — связанные события (например, attendance.openedattendance.closed одного сотрудника) могут переставляться между партициями, и инбокс-консьюмер на стороне кассы вынужден чинить порядок ретраями.

Ключ резолвится по бизнес-агрегату — «наименьшему осмысленному ordering scope»:

entityTypepartition-keyИсточникЧто гарантирует
corporationentityIdid сущностипорядок настроек одной корпорации
subdivisionentityIdid сущностипорядок изменений одного подразделения
sales_pointentityIdid сущностипорядок изменений одной точки продаж
pos_terminalentityIdid сущностипорядок изменений одного терминала
employeeentityIdid сущностиhire/update/dismiss одного сотрудника по порядку
positionentityIdid сущностипорядок правок одной должности
attendance_typeentityIdid сущностипорядок правок одного типа явки
payment_methodentityIdid сущностипорядок правок одного метода оплаты (включая состав привязок к подразделениям — он едет в payload.subdivisionIds, LOCALIOFFICE-1042/LOCALIOFFICE-1062)
payment_systementityIdid сущностипорядок правок одной платёжной системы (глобальный справочник, fanout по корпорациям, LOCALIOFFICE-1042)
nomenclatureentityIdid сущностипорядок правок одной позиции
nomenclature_groupentityIdid сущностипорядок правок одной группы
nomenclature_modifierentityIdid сущностипорядок правок одного модификатора
tech_cardentityIdid сущностипорядок правок одной техкарты
shift_typeentityIdid сущностипорядок правок одного шаблона смены
attendancepayload.employeeIdpayloadвсе явки одного сотрудника по порядку (включая узкие cancelled / force_closed, LOCALIOFFICE-1014)
schedule_entrypayload.employeeIdpayloadвесь график одного сотрудника по порядку
payment_ratepayload.employeeIdpayloadвсе ставки одного сотрудника по порядку
warehouse_stockpayload.warehouseIdpayloadвсе изменения остатков одного склада по порядку

Резолвер — src/outbox/domain/services/outbox-partition-key.resolver.ts. Чисто синхронный, exhaustive по OutboxEntityType (новый тип без обновления резолвера ловится TypeScript-never-assert ещё на этапе компиляции). Резолвер вызывается в PrismaOutboxAdapter.record(...), результат персистится в столбце outbox_events.partition_key и затем передаётся в kafka.publishOrThrow(..., { key }) шедулером — это даёт стабильный ключ при ретраях публикации.

Что НЕ решает partition-key стратегия. Кросс-сущностный порядок (например, cash_shift.opened должен прийти до attendance.opened с тем же cashShiftId) не гарантируется: события разных типов идут с разными ключами и могут попасть в разные партиции. Это сознательный размен — за ordering между неоднородными агрегатами отвечает идемпотентность инбокса кассы + retry с экспоненциальным backoff’ом. Цель partition-key — устранить рассогласования внутри одного агрегата, где ретрай маскировал бы реальную деградацию.

Эксплуатационные ограничения. Стандартный Kafka-партиционер использует hash(key) mod partitions. Менять число партиций топика office.events без перераспределения существующих сообщений нельзя — после resize старые ключи попадут в другие партиции, и in-flight события того же агрегата окажутся раскинуты между двумя партициями, нарушая FIFO. Корректный сценарий resize: создать новый топик, переключить producer на него, дождаться дренажа старого, удалить старый. Число партиций фиксируется в конфигурации кластера и в @locali/office-contracts README на стороне команды кассы.

Регистрация новой OutboxEntityType. Расширяя OUTBOX_ENTITY (см. IMPLEMENT.md §8), нужно:

  1. Добавить новый case в OutboxPartitionKeyResolver — выбрать наименьший осмысленный агрегат.
  2. Обновить таблицу выше.
  3. Если новая сущность ссылается на существующий агрегат (как attendance → employee) — переиспользовать тот же ключ, чтобы оба домена ехали в одну партицию.

2. КАССА (Kassa)

Облачный микросервис между офисом и POS-терминалами. Принимает события из Kafka, обрабатывает их, предоставляет REST API и WebSocket для POS, генерирует свои события. Касса является единственной точкой контакта для POS и центром управления схемой и логикой для терминалов.

Почему касса — отдельный сервис:

  1. Разделение ответственности — офис не знает про POS, маппинги, схемы терминалов
  2. Независимость деплоя — касса и офис обновляются и масштабируются независимо
  3. Независимая точка отказа — офис может быть временно недоступен, касса продолжает отдавать POS данные из своей БД
  4. Собственная логика — уведомления, агрегация, обработка чеков/смен. Не нагружает офис ненужной ответственностью
  5. Подготовка к будущему — проще заложить сейчас, чем выносить из офиса позже

2.1. Приём событий из Kafka (Consumer)

Kafka ──► Consumer ──► Processing for main entities ──► Сохранение в БД кассы

Kafka-consumer слушает topic (по умолчанию office.events), парсит CloudEvent и передаёт в InboundEventService.process(). Проверка version выполняется не в общем сервисе, а в обработчике сущности (InboundEventHandler.handle()) — каждый обработчик сам сравнивает входящий version с текущей в БД и возвращает APPLIED или SKIPPED. Upsert сущности и обновление статуса записи в inbound_events происходят в одной транзакции.

Обработка входящих событий:

┌─────────────────────────────────────────────┐
│ Processing for main entities │
│ │
│ 1. Save metadata and payload │
│ in inbound_events │
│ (revision назначается автоматически) │
│ │ │
│ ▼ │
│ 2. event.data.version ≤ │
│ current_db_version? │
│ │YES │NO │
│ ▼ ▼ │
│ SKIPPED 3. upsert payload │
│ │ │
│ ▼ │
│ 4. update status │
│ (applied/failed) │
│ for inbound_events │
└─────────────────────────────────────────────┘

Проверка version обеспечивает идемпотентность — если пришло событие со старой или равной версией, оно пропускается (SKIPPED).

2.2. Таблица inbound_events

Основа для /api/inbox. Каждое входящее событие получает автоинкрементный revision, который POS использует для догоняющей синхронизации.

┌─────────────────────────────────────────┐
│ inbound_events │
├─────────────┬───────────────────────────┤
│ PK │ UniqueID │
├─────────────┼───────────────────────────┤
│ Идемпот. │ kafka_event_id (UNIQUE) │ ◄── CloudEvent.id; дубль доставки отсекается БД
├─────────────┼───────────────────────────┤
│ Event │ event_type │ ◄── полный тип: "core.company.office.nomenclature.updated"
│ metadata │ entity_type │ ◄── "nomenclature"
│ │ entity_id │
│ │ version │
│ │ subdivision_ids (String[])│ ◄── массив из data.subdivision_ids, для роутинга
├─────────────┼───────────────────────────┤
│ Полный │ payload │
│ payload │ status │
│ Retry │ attempts │ ◄── счётчик попыток (инкремент при claimForProcessing)
│ │ last_error │ ◄── sanitized сообщение последней ошибки
│ │ next_retry_at │ ◄── момент следующей попытки (exponential backoff)
│ │ created_at │
│ │ processed_at │
├─────────────┼───────────────────────────┤
│ Sync │ revision (autoincrement) │ ◄── глобальный автоинкремент для /api/inbox
│ │ sync_to_pos │ ◄── boolean: нужно ли отдавать в /api/inbox
└─────────────┴───────────────────────────┘
Жизненный цикл статуса:
RECEIVED ──► PROCESSING ──► APPLIED ─── терминал
├─► SKIPPED ─── терминал (старая версия / битый payload;
│ «нет handler-а» — НЕ терминал: deferRetry возвращает
│ в RECEIVED с nextRetryAt +1 час до деплоя обработчика;
│ битый CloudEvent-envelope отсекается consumer-ом
│ ещё до записи в inbound_events)
└─► FAILED ──┐
│ scheduler retry (next_retry_at <= now AND attempts < 5)
PROCESSING (новая попытка)

kafka_event_idCloudEvent.id входящего сообщения. UNIQUE-индекс гарантирует, что повторная доставка одного и того же CloudEvent не порождает вторую запись в inbound_events. Идемпотентность приёма обеспечивается БД-ограничением, а не сравнением полей в коде.

revision — глобальный монотонно возрастающий счётчик. Каждое входящее событие получает следующее значение. POS запрашивает изменения по revision > last_synced_revision.

sync_to_pos — флаг, определяющий нужно ли событие POS-терминалам. Устанавливается при записи на основе entity_type. Не все события из Kafka предназначены для POS (например, внутренние события бухгалтерии).

subdivision_ids — массив затронутых подразделений, извлекается из data.subdivision_ids CloudEvent (см. §1.4). Используется для фильтрации событий в /api/inbox и WebSocket push через Prisma-оператор hasSome: терминалу отдаются только события, массив subdivision_ids которых пересекается с подразделениями терминала. Корректность fan-out’а (включая попадание события в подразделения, которые потеряли доступ к сущности, и раскрытие «доступен везде») гарантируется источником на стороне офиса — касса доверяет envelope и не дополняет его на receiver-side.

attempts / last_error / next_retry_at — retry-семантика, симметричная outbox_events. На исключении handler-а событие переводится в FAILED с next_retry_at = now + backoff(attempts). Фоновый InboundRetryScheduler раз в 10 секунд подбирает RECEIVED/FAILED события, у которых attempts < 5 AND next_retry_at <= now(), и атомарно переводит их в PROCESSING (один UPDATE ... WHERE status IN (...) — claim защищён от гонки consumer↔scheduler). Раз в минуту тот же scheduler сбрасывает зависшие PROCESSING-записи (старше 15 минут) обратно в FAILED.

Отсутствие родительской реплики справочника (например, событие номенклатуры пришло раньше своей группы) с LOCALIKASSA-399 не уводит событие в FAILED: FK между репликами сняты, дочерняя запись применяется сразу (APPLIED), а недостающая ссылка отслеживается отдельно через kassa_dangling_references (см. § 1.8 и IMPLEMENT.md §7.8). FAILED-retry остаётся для настоящих ошибок обработки (битый payload, недоступность БД).

2.3. Генерация исходящих событий (Transaction)

Касса имеет собственные Prisma-модели (CashShift, Receipt, ReceiptItem, Payment, CashShiftDocument в prisma/schema/sales.prisma) и REST-эндпоинты для POS: POST /api/receipts, POST /api/cash-shifts (open/close), POST /api/cash-shift-documents/{cash-in,cash-out,cashier-change,x-report}. Каждая бизнес-операция кассы и запись outbox — в одной транзакции:

-- Всё атомарно:
INSERT INTO receipts (...);
UPDATE cash_shifts SET ...;
INSERT INTO outbox_events (event_type, entity_type, entity_id, payload, status);

2.4. Outbox на стороне кассы

Структура outbox_events аналогична офисной (topic, channel, eventType, entityType, entityId, terminalId, payload, status, attempts, lastError, nextRetryAt, publishedAt; отдельной колонки partition-key нет — ключ публикации берётся из entityId, см. §1.8), плюс nullable-колонки traceparent / tracestate (LOCALIKASSA-386) — снимок W3C trace-контекста HTTP-запроса терминала, в котором событие было записано. Outbox Scheduler восстанавливает его при публикации и кладёт в Kafka message headers, чтобы трейс оставался единым через асинхронную границу outbox (терминал → касса → офис); NULL — событие записано вне трейса. Свой Outbox Scheduler → Producer публикует события в Kafka: запускается каждые 5 секунд, забирает батч до 100 PENDING-записей на канал.

outbox_events ──► Outbox Scheduler ──► Producer ──► Kafka broker
Статусы: PENDING ──► PUBLISHING ──► PUBLISHED
├──► PENDING (retry с backoff, attempts++;
│ ошибки соединения попытку не сжигают)
└──► FAILED (терминал: attempts >= maxAttempts канала;
реанимация — admin requeue, LOCALIKASSA-405 К8)

FIFO-гейт публикации (LOCALIKASSA-405, К2). findPendingBatch не выбирает событие, пока у той же сущности (entityId в рамках того же topic) есть более раннее (createdAt, tie-break id) НЕопубликованное событие. Без гейта retry с backoff переупорядочивал бы события одной сущности: receipt.updated уезжал бы в офис раньше receipt.created, ушедшего в backoff. Терминальный FAILED блокирует всю сущность до ручного requeue через admin API (POST /api/admin/outbox/failed/:id/requeue, POST /api/admin/outbox/failed/requeue по entityType) — это сознательно: продолжать публикацию хвоста поверх потерянного события нельзя. Число сущностей, чьё старейшее неопубликованное событие старше 60 секунд, видно в gauge kassa_outbox_stalled_aggregates; возраст старейшего неопубликованного события — kassa_outbox_oldest_pending_age_seconds.

2.5. REST API /api/inbox — из БД кассы

Касса отдаёт /api/inbox из собственной БД (таблица inbound_events). Не проксирует в офис.

POS ──► Касса GET /api/inbox?last_synced_revision=N
(Authorization: Bearer <JWT терминала>)
SELECT FROM inbound_events
WHERE revision > N
AND sync_to_pos = true
AND status = 'APPLIED'
AND subdivision_ids && <подразделения терминала> -- hasSome
ORDER BY revision ASC
LIMIT 100

Подразделения терминала касса берёт из JWT-payload POS (subdivisionIds, выдаётся при авторизации через /auth/terminal). Фильтр использует Prisma hasSome, то есть возвращаются события, у которых массив subdivisionIds пересекается с массивом терминала.

Важно: фильтр status = 'APPLIED' исключает SKIPPED-события (старые версии) из ответа. Без него POS мог бы получить устаревшие данные и перезаписать ими актуальные.

Формат ответа /api/inbox

{
"events": [
{
"event_type": "core.company.office.nomenclature.updated",
"entity_id": "uuid-nomenclature-id",
"entity_type": "nomenclature",
"action": "updated",
"payload": {
"id": "uuid-nomenclature-id",
"name": "Новый Цезарь",
"price": 550.00,
"is_available": true,
"version": 5
},
"revision": 124
},
{
"event_type": "core.company.office.employee.created",
"entity_id": "uuid-employee-id",
"entity_type": "employee",
"action": "created",
"payload": {
"id": "uuid-employee-id",
"full_name": "Иванов Иван",
"role": "cashier",
"pin": "1234",
"version": 1
},
"revision": 125
}
],
"last_revision": 125,
"has_more": false
}
  • event_type — полный тип события (prefix.entity_type.action). POS не собирает его из частей — использует as-is
  • events — массив изменений из inbound_events с revision > last_synced_revision, sync_to_pos = true, status = 'APPLIED' и subdivision_ids, пересекающимися с подразделениями терминала, отсортированных по revision ASC
  • last_revision — максимальная ревизия в текущем батче. POS сохраняет её как last_synced_revision
  • has_more — есть ли ещё данные за пределами батча. Если true — POS делает повторный запрос с last_synced_revision=last_revision
  • payload содержит все поля сущности включая version — POS использует поля как параметры для собственных UPSERT-ов в локальную SQLite
  • event_type приходит готовым из кассы — POS использует его напрямую как ключ роутинга в свой локальный маппинг handler-ов

Синхронизируемые сущности

Через inbound_events касса обрабатывает следующие сущности офиса: employee, employee_position, position, subdivision, sales_point, payment_method, deposit_withdrawal_type (LOCALIKASSA-297), nomenclature_group, pos_terminal, nomenclature, modifier, tech_card, tech_card_ingredient, warehouse_stock, corporation, preparation_place (LOCALIKASSA-311). Регистрация обработчиков — декларативно в InboxCoreModule (handler сам объявляет entityType, eventTypes, syncToPos).

Все эти сущности приходят через один Kafka-топик office.events (LOCALIKASSA-248). Отдельных inbound-топиков для каких-либо сущностей касса больше не слушает.

2.6. Синхронизируемые сущности офиса

Через inbound_events касса обрабатывает следующие сущности офиса: employee, employee_position, position, subdivision, payment_method, nomenclature_group, pos_terminal, nomenclature, modifier, tech_card, tech_card_ingredient, warehouse_stock, corporation. Регистрация handler-ов — декларативно в InboxCoreModule кассы (handler сам объявляет entityType, eventTypes, syncToPos). Все эти сущности приходят через один Kafka-топик office.events (LOCALIKASSA-248) — отдельных inbound-топиков для каких-либо сущностей касса больше не слушает.

DDL локальной SQLite POS и маппинг event_type → SQL касса по сети не отдаёт: и то, и другое живёт внутри бандла POS — реестр в electron/modules/sync/registry.ts, служебные таблицы и список миграций — в electron/shared/db/schema.ts (см. раздел 3.4).

Офис дополнительно публикует в outbox сущность sales_point (LOCALIOFFICE-983) — точка продаж внутри подразделения. Контракт уже идёт через OUTBOX_ENTITY.SALES_POINT с subdivisionIds=[entity.subdivisionId], локальный wire-payload — в src/common/ports/outbox-payloads.ts (SalesPointOutboxPayload). Инвариант, который держится в офисе: у каждого Subdivision ровно одна SalesPoint с isDefault=true — её создаёт SubdivisionService.create одной транзакцией с подразделением (LOCALIOFFICE-984), и она же служит fallback-ом при inbound-приёме чека без явного salesPointId. Регистрация DDL/обработчика на стороне кассы (приём sales_point-события и пометка чека выбранной точкой при логине сотрудника) делается отдельной задачей на стороне кассы; до её выкатки sync_to_pos для entity_type=sales_point не выставляется.

Офис также публикует в outbox сущность shift_type (LOCALIOFFICE-1002) — шаблон плановой смены корпорации. Контракт — OUTBOX_ENTITY.SHIFT_TYPE с subdivisionIds=[] (корпоративный справочник, как attendance_type/position), wire-payload — ShiftTypePayload из @locali/office-contracts@4.3.0, локально расширен в src/common/ports/outbox-payloads.ts как ShiftTypeOutboxPayload. Стиль публикации — снапшот: любое изменение (create / update / soft-delete / restore) уезжает одним OUTBOX_ACTION.UPDATED с актуальным payload.isDeleted и payload.version. Сидер шаблонов (002-staffing-defaults.seeder.ts) пишет в БД напрямую через prisma.shiftType.create, минуя ShiftTypeService, поэтому при пересеяле дев-стенда outbox не наполняется (это умышленно). Регистрация DDL/обработчика на стороне кассы — LOCALIKASSA-286; до её выкатки sync_to_pos для entity_type=shift_type не выставляется.

Для attendance офис, помимо обычного attendance.updated со snapshot-payload, публикует два узких события (LOCALIOFFICE-1014) — attendance.cancelled и attendance.force_closed. Они дополняют, а не заменяют UPDATED: на затронутых сценариях из офиса уходит пара событий (полный snapshot для всех подписчиков + узкое — для кассы). Касса подписывается только на узкие события, чтобы не фильтровать шумный поток правок и при этом синхронизировать локальную копию явки (снять partial unique-индекс «одна открытая явка на сотрудника» и/или перевести OPEN→OVERDUE/CLOSED).

  • attendance.cancelled (OUTBOX_ACTION.CANCELLED) — публикуется при soft-delete явки в офисе: отклонение PENDING_PROXY_CARD_APPROVAL (AttendanceService.resolveProxyCard REJECT), отклонение PENDING_OFF_SCHEDULE_APPROVAL (resolveOffSchedule REJECT), обычное удаление (delete), перекрытие реальной явкой авто-явки отсутствия (applyAbsenceOverride). Wire-payload — AttendanceCancelledOutboxPayload в src/common/ports/outbox-payloads.ts (минимум: id, corp, subdivision, employee, reason, version). Причина (AttendanceCancellationReason) даёт кассе контекст: OFF_SCHEDULE_REJECTED / PROXY_CARD_REJECTED / OVERRIDDEN_BY_REAL_ARRIVAL / DELETED_BY_OFFICE.
  • attendance.force_closed (OUTBOX_ACTION.FORCE_CLOSED) — публикуется, когда офис закрыл явку без участия кассы: scheduler autoCloseOverdue (только при source === "OFFICE", причина AUTO_CLOSED_ON_OVERDUE, статус OVERDUE) и увольнение сотрудника с открытой явкой (AttendanceService.autoCloseOnDismissal, вызывается из EmployeeService.dismiss; причина DISMISSED, статус CLOSED). Wire-payload — AttendanceForceClosedOutboxPayload (id, corp, subdivision, employee, status: CLOSED|OVERDUE, reason, version). При inbound-сценариях от кассы (closeFromKassa, autoCloseStaleOpenAttendance с source: "KASSA") узкое событие не публикуется — касса не уведомляет сама себя.

Routing: subdivisionIds: [attendance.subdivisionId] — событие летит только в касса-копию подразделения, владеющего явкой. Partition-key — payload.employeeId (общий резолвер OutboxPartitionKeyResolver для всех вариантов payload). Wire-контракт живёт в @locali/office-contracts@5.1.0 (AttendanceCancelledPayload / AttendanceForceClosedPayload + EVENT_TYPES.attendanceCancelled / EVENT_TYPES.attendanceForceClosed); локально в src/common/ports/outbox-payloads.ts сделан только реэкспорт под историческими именами Attendance*OutboxPayload. DDL/обработчик на стороне кассы — LOCALIKASSA-293.

Механизм работы

  1. Бэкенд кассы регистрирует handler для entity_type в InboxCoreModule (handler сам объявляет eventTypes, syncToPos). Никакой агрегированной DDL-схемы по сети не отдаётся — DDL живёт у POS в electron/modules/sync/registry.ts.
  2. POS получает события (через /api/inbox или WebSocket) и применяет к локальной SQLite EVENT_SQL[event.event_type] с биндом payload. Маппинг и DDL обновляются вместе с бандлом POS (см. раздел 4 про auto-update).
  3. Фронтенд маппит кнопки UI на вызов REST-эндпоинтов кассы (POST /api/receipts и т.п.) — бизнес-логика на стороне POS сводится к минимуму.

2.7. Idempotency-Key — защита от дублирования при нестабильной связи

При нестабильном соединении POS может отправить запрос (создание чека), получить таймаут и не знать, прошёл ли запрос. При повторной отправке без защиты — дубликат чека.

Решение: заголовок Idempotency-Key на всех мутирующих запросах POS → Касса.

┌─────────────────────────────────────────┐
│ idempotency_keys │
├─────────────┬───────────────────────────┤
│ PK │ key (UUID из заголовка) │
├─────────────┼───────────────────────────┤
│ │ response_status (INT) │
│ │ response_body (JSON) │
│ │ created_at │
└─────────────┴───────────────────────────┘

Логика (NestJS IdempotencyInterceptor, зарегистрирован глобально через app.useGlobalInterceptors(...) в main.ts кассы):

  1. POS отправляет POST /api/receipts с заголовком Idempotency-Key: <uuid>
  2. Interceptor пропускает GET-запросы; для POST/PUT/PATCH/DELETE проверяет: есть ли ключ в idempotency_keys?
    • Есть → вернуть сохранённые response_status и response_body (не выполнять повторно)
    • Нет → выполнить запрос, сохранить результат с ключом, вернуть ответ
  3. TTL-очистка: cron каждые 24 часа удаляет записи старше 48 часов

Race condition: key имеет UNIQUE constraint в БД. Если два параллельных запроса с одним ключом пришли одновременно:

  • Первый INSERT выигрывает
  • Второй ловит UniqueConstraintViolation → делает SELECT + ждёт пока первый запишет результат → возвращает его

Важно:

  • Применяется только к мутирующим эндпоинтам (POST/PUT/PATCH/DELETE), не к GET
  • POS генерирует Idempotency-Key при создании pending_request, а не при отправке — так повторная отправка использует тот же ключ
  • Ключ — UUID v4, генерируется на стороне POS

2.8. WebSocket — real-time push для POS

После авторизации и первичной синхронизации POS устанавливает WebSocket-соединение с кассой (Socket.IO, namespace из process.env.WS_NAMESPACE). Это основной канал получения обновлений.

POS ──► WS CONNECT wss://kassa/<namespace>
handshake.auth.token = <JWT терминала> (или ?token=... в query)
◄── событие 'connected'
Касса при получении нового события из Kafka:
1. Consumer обрабатывает событие последовательно (inbound_events + upsert) в транзакции
2. Только если status = APPLIED и syncToPos = true:
3. TerminalConnectionService.pushInboxEvents() рассылает через broadcastToSubdivision('inbox', payload)
каждому подразделению из subdivisionIds события (индекс subdivisionIndex в памяти)
4. Пушит событие 'inbox' через WebSocket в том же формате, что и /api/inbox

Имена WebSocket-событий: inbox (push событий с inbound_events), connected (приветствие после handshake), ping/pong (keep-alive), error, message. На стороне клиента POS подписывается на inbox и вызывает ту же processEvents(), что и для /api/inbox.

Важно: Consumer обрабатывает события последовательно (single-threaded). Это гарантирует, что revision назначаются в порядке обработки и WebSocket push отправляется только после коммита в БД. Без этого возможен пропуск событий при параллельной обработке.

Формат WS-сообщения идентичен ответу /api/inbox:

{
"events": [{ "event_type": "...", "entity_type": "...", "action": "...", "payload": {...}, "revision": 543 }],
"last_revision": 543
}

POS использует одну и ту же функцию для обработки событий из WebSocket и из /api/inbox. WebSocket — основной канал, /api/inbox — recovery после обрыва.

2.9. Авторизация POS

Касса авторизует терминал собственным эндпоинтом:

POS ──► POST /api/auth/terminal
body: { code: string, password: string }
◄── { accessToken, terminalId, subdivisionIds, corporationId }
  • Касса ищет терминал по code в таблице pos_terminals (поля: id, code (unique), name, passwordHash, isActive, subdivisionId FK) и сверяет password через bcrypt (с защитой от timing-атак)
  • В payload JWT кладёт { terminalId, subdivisionIds, corporationId } — в БД у терминала один subdivisionId (FK → Subdivision), в JWT он оборачивается в массив subdivisionIds для единообразия с форматом inbound_events.subdivisionIds
  • expiresIn/refresh-token не возвращаются — access-токен без явного TTL в ответе (TTL задаётся конфигом JWT)
  • Этот JWT используется для /api/inbox (Bearer) и WebSocket (handshake.auth.token или ?token=...)
  • subdivisionIds из JWT — источник истины для фильтрации событий в /api/inbox и для маршрутизации WebSocket push

3. POS (Point of Sale — Терминал)

Клиентское приложение на кассовом терминале с поддержкой offline-first работы. POS хранит локальную SQLite-копию справочников офиса (better-sqlite3 внутри userData) и применяет к ней входящие события самостоятельно — DDL и маппинг event_type → SQL живут внутри бандла POS, касса схему по сети не отдаёт.

3.1. Инициализация — локальная схема из бандла

При первом запуске POS применяет DDL из локального реестра (electron/modules/sync/registry.ts) — CREATE TABLE IF NOT EXISTS + CREATE INDEX IF NOT EXISTS — и проставляет PRAGMA user_version = MIGRATIONS.length. Существующие установки прокручивают MIGRATIONS (append-only массив ALTER/CREATE) начиная с текущего user_version (см. electron/shared/db/schema.ts). Доставка новых полей/таблиц — через релиз нового бандла POS (раздел 4).

3.2. Приём событий — два транспорта, один обработчик

POS получает данные через два канала:

  • WebSocket — основной, real-time push от кассы
  • /api/inbox polling — recovery после обрыва WebSocket или при первичной синхронизации

Оба канала используют одинаковый формат и одну функцию обработки:

function processEvents(data) {
for (event of data.events) {
// event_type приходит готовым из кассы — POS не собирает его из частей
const sql = EVENT_SQL[event.event_type]; // локальный маппинг из registry.ts
if (sql) execute(sql, event.payload); // db.prepare(sql).run(payload)
// Неизвестные event_type (нет ключа в ALLOWED_EVENT_TYPES) — игнорируются
}
updateLastSyncedRevision(data.last_revision);
}
// Источник 1: WebSocket (основной, real-time)
ws.onMessage(msg => processEvents(msg));
// Источник 2: /api/inbox (recovery после обрыва)
async function recover() {
const rev = getLastSyncedRevision();
const data = await fetch("/api/inbox?last_synced_revision=" + rev);
processEvents(data);
if (data.has_more) recover();
}

3.3. Генерация событий (Terminal → Касса)

POS генерирует бизнес-события (создание чеков, закрытие смен и т.д.) и отправляет их напрямую через REST-эндпоинты кассы — каждая бизнес-операция имеет свой эндпоинт на кассе:

POS Касса
│ │
│ POST /api/receipts ────────────► │ (создание чека)
│ Idempotency-Key: <uuid> │
│ POST /api/cash-shifts/close ──► │ (закрытие смены)
│ Idempotency-Key: <uuid> │
│ │
│ │ Касса в транзакции:
│ │ 1. Бизнес-логика (INSERT/UPDATE)
│ │ 2. INSERT outbox_events → Kafka

POS не хранит outbox_events. Вместо этого, при отсутствии сети POS откладывает вызовы REST-эндпоинтов в локальную очередь:

┌─────────────────────────────────────────┐
│ pending_requests (POS) │
├─────────────┬───────────────────────────┤
│ PK │ UniqueID │
├─────────────┼───────────────────────────┤
│ Request │ method │
│ metadata │ url │
│ │ body (JSON) │
│ │ idempotency_key (UUID) │ ◄── генерируется при создании записи
├─────────────┼───────────────────────────┤
│ Статус │ status │
│ │ last_error │
│ │ attempts │
│ │ created_at │
└─────────────┴───────────────────────────┘
Статусы: PENDING ──► SENDING ──► SENT / FAILED

При восстановлении сети POS отправляет отложенные запросы в порядке создания. idempotency_key генерируется при создании записи, что гарантирует что повторная отправка использует тот же ключ.

3.4. Service Worker — синхронизация и офлайн

Service Worker управляет синхронизацией:

┌───────────────────────────────────────────────────────────────┐
│ Service Worker │
│ │
│ СЦЕНАРИЙ 1: Первый вход / первичная инициализация │
│ ───────────────────────────────────────────────── │
│ Применить локальный DDL из бандла POS (registry.ts) │
│ + MIGRATIONS по PRAGMA user_version │
│ │
│ GET /api/inbox (без параметров) │
│ → Полная загрузка всех справочников из БД кассы (батчами) │
│ → Применение EVENT_SQL для каждого события │
│ → Сохранение last_synced_revision из ответа │
│ │
│ │
│ СЦЕНАРИЙ 2: WebSocket (основной режим) │
│ ───────────────────────────────────────────────── │
│ WS CONNECT wss://kassa/ws │
│ → Получение real-time событий │
│ → Применение EVENT_SQL (та же функция processEvents) │
│ → Обновление last_synced_revision │
│ │
│ │
│ СЦЕНАРИЙ 3: Recovery после обрыва WebSocket │
│ ───────────────────────────────────────────────── │
│ │
│ ┌──────────────────────────────────┐ │
│ │ GET /api/inbox? │ │
│ │ last_synced_revision=N │ ◄── из БД кассы │
│ └──────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ Применение EVENT_SQL │
│ Обновление last_synced_revision │
│ │
│ ┌──────────────────────────────────┐ │
│ │ Отправка отложенных │ │
│ │ pending_requests → касса │ │
│ │ (с Idempotency-Key) │ │
│ │ После успеха: пометка │ │
│ │ статуса SENT │ │
│ └──────────────────────────────────┘ │
│ │
│ WS RECONNECT │
└───────────────────────────────────────────────────────────────┘

3.5. Локальная SQLite POS — служебные и доменные таблицы

DDL живёт внутри бандла POS (electron/shared/db/schema.ts, electron/modules/sync/registry.ts) — касса не отдаёт схему по сети, POS применяет её сам при первом запуске.

Служебные таблицы (создаются первыми)

┌──────────────────────────────┐
│ local_settings │ ◄── key/value: last_synced_revision, last_connected_at и пр.
├──────┬───────────────────────┤
│ PK │ key (TEXT) │
│ │ value (TEXT NOT NULL) │
└──────┴───────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ pending_requests │ ◄── очередь отложенных POST/PUT/PATCH/DELETE
├──────┬──────────────────────────────────────────────────────────┤
│ PK │ id (TEXT) │
│ │ method, url, body, headers (TEXT/JSON) │
│ │ idempotency_key (TEXT NOT NULL) │
│ │ status TEXT NOT NULL DEFAULT 'PENDING' │
│ │ attempts INTEGER NOT NULL DEFAULT 0 │
│ │ last_error TEXT │
│ │ created_at INTEGER NOT NULL DEFAULT (strftime('%s','now'))│
└──────┴──────────────────────────────────────────────────────────┘
+ INDEX idx_pending_status_created (status, created_at)

local_settings хранит как минимум last_synced_revision (обновляется из last_revision в ответе /api/inbox и WebSocket-сообщений) и last_connected_at. Используется как key/value, чтобы добавление новой настройки не требовало миграции схемы.

Таблицы синхронизируемых сущностей

Декларативно собираются из ENTITY_REGISTRY (electron/modules/sync/registry.ts). Каждая запись реестра содержит ddl, опциональные indexes, readSql и опциональный event (suffix + UPSERT/DELETE SQL + items SQL для вложенных массивов). Производные ENTITY_DDL/EVENT_SQL/ITEMS_SQL пересобираются автоматически. На загрузке модуля registry.ts сверяет каждый сгенерированный event_type с каталогом EVENT_TYPES из @locali/office-contracts — рассогласование падает рано, а не на первом «event_type не распознан» из inbox.

Текущие таблицы: employees, employee_positions, positions, subdivisions, sales_points, payment_methods, nomenclature_groups, pos_terminals, nomenclatures, modifiers, tech_cards, tech_card_ingredients, warehouse_stocks. Все DDL — через CREATE TABLE IF NOT EXISTS + CREATE INDEX IF NOT EXISTS (повторный запуск не падает).

Миграции и user_version

POS использует PRAGMA user_version для версионирования схемы:

  1. SCHEMA (в registry.ts) — целевое состояние. Свежие установки применяют её сразу и проставляют user_version = MIGRATIONS.length.
  2. MIGRATIONS (schema.ts) — append-only массив. Каждый элемент — один ALTER/CREATE/UPDATE для перехода N → N+1, идемпотентный в своём шаге.
  3. Прод-кассы прокручивают миграции по порядку, начиная с текущего user_version. После успеха user_version инкрементируется — повторно шаг не выполнится.

При добавлении/изменении поля в офисной сущности правится ddl в ENTITY_REGISTRY (для свежих установок) и добавляется новый элемент в MIGRATIONS (для существующих POS). Касса в этом не участвует — новый DDL доезжает до терминалов только с новым бандлом POS (см. раздел 4).

Применение входящих событий

POS получает событие через /api/inbox или WebSocket → берёт event.event_type → ищет SQL в локальной мапе EVENT_SQL → выполняет db.prepare(sql).run(event.payload). Параметры именованные (:id, :version, …) совпадают с полями payload. Если у события есть itemsSql (например, ингредиенты техкарты) — массив из payload[arrayField] вставляется отдельным prepared-statement в транзакции с основным UPSERT-ом.

Неизвестные event_type (нет ключа в ALLOWED_EVENT_TYPES) игнорируются — POS «забывает» события, для которых не подготовлен handler в текущей версии бандла.


4. Доставка клиента POS (auto-update)

Клиентское приложение POS — Electron-сборка под Windows (NSIS). У неё два независимых канала обновлений: .exe едет через electron-updater (медленный канал), renderer-бандл — отдельным zip (быстрый канал). Оба используют один S3-bucket в Yandex Object Storage, но разные префиксы.

4.1. Desktop-канал: electron-updater + Yandex S3

Содержимое канала — собранный NSIS-инсталлятор (.exe ~80 МБ): main + preload + IPC + DDL + SQL + embedded dist/. Релизы редкие — раз в спринт или при изменении IPC-контракта/схемы. Реализация: electron/modules/updater/desktop.ts (тикет LOCALIKASSA-263).

S3 (locali-kassa-updates/desktop/) POS (Electron-main)
│ │
│ latest.yml │ 1. initDesktopUpdater(win) — на старте + раз в час
│ Kassa Setup X.Y.Z.exe │ (TIMING.desktopUpdateCheckMs = 60 * 60 * 1000)
│ *.blockmap │ 2. autoUpdater.checkForUpdates()
│ │ 3. update-available → IPC «updater:available»
│ ◄────── GET latest.yml ──────────────────── │
│ ────── update info ─────────────────────► │ 4. autoDownload = true (фон, blockmap-diff)
│ ◄────── GET .exe + .blockmap (diff) ────── │ 5. update-downloaded → IPC «updater:downloaded»
│ │ 6. autoInstallOnAppQuit = true: применится
│ │ при следующем штатном выходе.
│ │ Кнопка «Перезапустить» в UI зовёт IPC
│ │ «updater:quit-and-install» → autoUpdater.quitAndInstall()
  • Bucket: locali-kassa-updates/desktop/ в Yandex Cloud, публичный read.
  • electron-builder publish: provider: generic (read-only) + ручной aws s3 sync на release. CI-job release-desktop собирает .exe через electron-builder и публикует *.exe + *.blockmap + latest.yml на тэг v*.*.*.
  • В dev (!app.isPackaged) updater полностью отключёнinitDesktopUpdater возвращается сразу, чтобы dev-машина не пыталась обновляться.
  • NSIS-only: portable-exe через electron-updater не обновляется.

4.2. Web-канал: manifest + zip + sha256

Renderer-бандл (dist/ от Vite ~1-2 МБ) — отдельный канал, чтобы гонять UI-фиксы без переустановки клиента и без пересборки electron-приложения. Реализация: electron/modules/updater/web.ts (тикет LOCALIKASSA-264).

S3 (locali-kassa-updates/web/) POS (Electron-main)
│ │
│ manifest.json (Cache-Control: no-cache) │ 1. checkAndDownload(WEB_MANIFEST_URL) перед createWindow()
│ <version>/bundle.zip (immutable) │ 2. GET manifest → { version, bundleUrl, sha256, compatMain? }
│ │ 3. semver.satisfies(app.getVersion(), compatMain) — если нет, skip
│ ◄────── GET manifest.json ───────────────── │ 4. installed.version === manifest.version — если да, skip
│ ────── { version, bundleUrl, sha256 } ────► │ 5. GET zip → проверить sha256
│ ◄────── GET <version>/bundle.zip ────────── │ 6. extract-zip → userData/web/<version>.staging.<pid>/
│ │ 7. fs.renameSync → userData/web/<version>/ (атомарно)
│ │ 8. write userData/web/current.json через .tmp + rename
│ │ 9. BrowserWindow.loadFile(resolveIndexPath(embedded))
│ │ — установленный bundle, либо embedded dist из .asar

Структура bucket:

locali-kassa-updates/web/
├── manifest.json ← Cache-Control: no-cache (постоянный URL, всегда указывает на актуальную версию)
├── a3f2d91/bundle.zip ← одна папка на версию (sha-хэш коммита)
├── b14ef8c/bundle.zip ← старые версии остаются — rollback правкой manifest.json
└── ...

Особенности:

  • compatMain: semver-диапазон desktop-версий, при которых bundle совместим (защита от рассинхрона main↔renderer). Любое изменение IPC-контракта (src/types/electron.d.ts) или бизнес-DTO → бамп major в package.json/version + новый compatMain=">=<major>". Старые .exe тогда не подтянут несовместимый bundle.
  • sha256 обязателен: без него подмена zip через MitM теоретически возможна (bucket публичный). С sha256 атакующему нужно подменить и zip, и manifest одновременно.
  • Атомарный install: распаковка в staging-директорию с PID-суффиксом → renameSync на финальный путь. Pointer current.json тоже пишется через .tmp + rename — никаких полузаписанных состояний.
  • Все ошибки глотаются. Если manifest битый / sha не совпал / нет сети — resolveIndexPath отдаёт предыдущий установленный bundle, или embedded dist/ из .asar. Касса/POS не падают на старте из-за проблем CDN.
  • Триггер CI: каждый merge в main, который меняет renderer-код (src/**, public/**, конфиги сборки). Изменения в electron/** требуют desktop-релиза — web-job на них не срабатывает.

4.3. Разделение каналов

Что меняетсяКаналПочему
Renderer-код (компоненты, стили, бизнес-логика UI)webдёшево и быстро, не требует переустановки
IPC-контракт, preload, main-процесс, DDL/SQL POSdesktopв .exe, без него bundle несовместим (compatMain блокирует)
Зависимости native-модулей (better-sqlite3 и т.п.)desktopмодули скомпилированы в .exe под конкретный Electron ABI

5. Сквозные потоки данных

5.1. Изменение номенклатуры (Офис → POS)

Офис Касса POS
│ │ │
│ 1. Изменение сущности │ │
│ (version = 5) │ │
│ 2. Транзакция сервиса: │ │
│ ├─ UPDATE (version = 5) │ │
│ └─ INSERT outbox_events │ │
│ (payload включает │ │
│ subdivision_id) │ │
│ │ │
│ 3. Outbox Scheduler │ │
│ └─ Producer ──Kafka──► │ │
│ (CloudEvent с │ │
│ version: 5) │ │
│ │ 4. Consumer │
│ │ ├─ INSERT inbound_events │
│ │ │ (revision = 124, │
│ │ │ sync_to_pos = true, │
│ │ │ subdivision_ids[]) │
│ │ ├─ handler.handle(): │
│ │ │ Check version │
│ │ └─ Upsert payload │
│ │ │
│ │ 5a. WebSocket push ────────►│ (real-time, терминалам чьи
│ │ { events: [...], │ subdivisionIds пересекаются
│ │ last_revision: 124 } │ с subdivision_ids события)
│ │ │
│ │ ИЛИ │
│ │ │ 5b. Polling GET /sync
│ │ │ ?last_synced_revision=123
│ │ ◄──────────────────────────│
│ │ SELECT FROM inbound_events │
│ │ WHERE revision > 123 │
│ │ AND sync_to_pos = true │
│ │ AND subdivision_ids │
│ │ hasSome <terminal> │
│ │ ──────────────────────────►│
│ │ │
│ │ │ 6. Взять event_type из ответа
│ │ │ 7. Найти SQL в локальном маппинге POS
│ │ │ 8. Выполнить SQL в локальной БД
│ │ │ 9. last_synced_revision = 124

5.2. Создание чека (POS → Офис)

POS Касса Офис
│ │ │
│ 1. Бизнес-операция │ │
│ │ │
│ 2. POST /api/receipts ────► │ │
│ Idempotency-Key: <uuid> │ │
│ (или pending_requests │ │
│ если офлайн) │ │
│ │ 3. Проверить Idempotency-Key│
│ │ Новый → выполнить: │
│ │ Transaction: │
│ │ ├─ INSERT receipts │
│ │ ├─ UPDATE cash_shifts │
│ │ └─ INSERT outbox_events │
│ │ Сохранить результат с Key│
│ │ │
│ │ 4. Outbox Scheduler │
│ │ └─ Producer ──Kafka──► │
│ │ │ 5. Consumer
│ │ │ └─ Upsert by UUID

Одна REST-операция кассы может породить несколько outbox-событий в одной транзакции. Пример — POST /api/cash-shifts/:id/close (LOCALIKASSA-298): касса считает расхождение actualCashAmount против расчётного остатка и при превышении Corporation.cashShiftDiscrepancyThreshold дополнительно создаёт CashShiftDocument (CASH_IN/CASH_OUT) по системному DWT-типу — в итоге публикуются cash_shift.closed плюс опционально deposit_withdrawal_operation.performed (source: 'POS_AUTO_DISCREPANCY'). Детали — в IMPLEMENT.md кассы (§ 8.6).

5.3. Офлайн-сценарий

POS (офлайн) Касса Офис
│ │ │
│ Нет сети │ │
│ Работает автономно │ │
│ Запросы копятся в │ │
│ pending_requests │ │
│ (с Idempotency-Key) │ │
│ │ │
│ ── Сеть восстановлена ── │ │
│ │ │
│ ШАГ 1: Догнать пропущенное │ │
│ GET /api/inbox? │ │
│ last_synced_revision=124 ►│ │
│ │ SELECT FROM │
│ │ inbound_events │
│ │ WHERE revision > 124 │
│ │ AND sync_to_pos=true│
│ │ AND subdivision_ids │
│ │ hasSome <terminal>│
│ ◄──────────────────────────│ │
│ │ │
│ Применение SQL из маппинга │ │
│ last_synced_revision = 189 │ │
│ │ │
│ ШАГ 2: Отправить отложенные│ │
│ POST (с Idempotency-Key)──►│ Обработка │
│ Пометка SENT │ (дубликаты │
│ │ отсекаются по Key) │
│ │ │
│ ШАГ 3: Восстановить WS │ │
│ WS CONNECT ────────────────►│ │
│ ◄── WS CONNECTED │ │

6. Ключевые архитектурные паттерны

  1. Transactional Outbox — офис и касса используют outbox-паттерн: бизнес-операция и запись события выполняются в одной транзакции, что гарантирует at-least-once доставку без распределённых транзакций. POS использует очередь отложенных REST-запросов (pending_requests).

  2. Idempotent Consumer — проверка version на стороне кассы предотвращает применение устаревших или повторных событий. Если version входящего события ≤ текущей в БД — событие получает статус SKIPPED.

  3. Idempotency-Key — все мутирующие запросы POS → Касса содержат заголовок Idempotency-Key. Касса хранит ключ + результат 48 часов. Повторный запрос с тем же ключом возвращает сохранённый ответ без повторного выполнения. Критично при нестабильной связи.

  4. Версионирование (version) — каждая синхронизируемая сущность в офисе хранит поле version, инкрементируемое при каждом изменении. Передаётся в CloudEvents и в /api/inbox. Используется для идемпотентной обработки на кассе и POS.

  5. Revision-based синхронизацияinbound_events на кассе имеет автоинкрементный revision. POS запрашивает изменения по revision > last_synced_revision. Это позволяет POS догонять пропущенные события после офлайна.

  6. CloudEvents — стандартизированный формат сообщений в Kafka. Каждое сообщение содержит метаданные (specversion, type, source, id, time, subject) и данные (data.version + data.payload).

  7. Локальная схема POS (in-bundle DDL) — POS носит DDL локальной SQLite и маппинг event_type → SQL внутри собственного бандла (registry.ts + schema.ts). Касса схему по сети не отдаёт. Доставка новых полей/таблиц — через релиз нового бандла POS (web- или desktop-канал, см. раздел 4); версионирование — PRAGMA user_version + append-only MIGRATIONS.

  8. Касса — единственная точка контакта для POS — POS общается только с кассой. /api/inbox отдаётся из БД кассы (не проксируется в офис). Операции (чеки, смены) отправляются на кассу. Офис не знает про POS.

  9. Dual transport, single handler — POS получает события через два канала (WebSocket и /api/inbox), но обрабатывает их одной функцией processEvents(). WebSocket — основной real-time канал, /api/inbox — recovery после обрыва.

  10. Offline-first (POS) — POS работает автономно, накапливая REST-запросы в pending_requestsIdempotency-Key). При восстановлении связи: 1) догнать через /api/inbox, 2) отправить отложенные запросы, 3) восстановить WebSocket.

  11. Конвенция event_type — полный тип события формируется по формуле prefix + entity_type + "." + action. Префикс зависит от источника: core.company.office. (офис), core.company.kassa. (касса). Сами префиксы и каталог известных event_type живут в общем пакете @locali/office-contracts (OFFICE_EVENT_TYPE_PREFIX, KASSA_EVENT_TYPE_PREFIX, EVENT_TYPES.*). Конвенция едина для Kafka-сообщений и ответа /api/inbox.

  12. Двухканальный auto-update POS — клиент POS обновляется через два независимых канала S3: desktop (.exe + electron-updater, редкие релизы) и web (renderer-bundle.zip + sha256 + compatMain, быстрые UI-фиксы). Разделение в разделе 4.

  13. Фильтрация событий для POS — не все события из Kafka предназначены для POS. Поле sync_to_pos в inbound_events определяет, нужно ли событие терминалам. /api/inbox и WebSocket push отдают только события с sync_to_pos = true.

  14. Роутинг по subdivision_ids — офис передаёт data.subdivision_ids (массив) в CloudEvent каждой сущности. Касса сохраняет массив subdivisionIds в inbound_events и использует Prisma-оператор hasSome для фильтрации: в /api/inbox и WebSocket push терминал получает только события, чей subdivision_ids пересекается с подразделениями терминала (массив из JWT).

  15. Авторизация через JWTcorporationId не передаётся в эндпоинтах, а извлекается из JWT. Супер-админ (User.isAdmin) через эндпоинт switch-corporation получает новый JWT с другим corporationId и работает с данными нужной корпорации через те же эндпоинты.

  16. Kafka partition-key per aggregate (LOCALIOFFICE-1003) — каждое outbox-событие публикуется в Kafka с явным ключом по наименьшему осмысленному агрегату (entityId или payload.employeeId/warehouseId). Это даёт consumer-у кассы FIFO внутри одного агрегата без сравнения timestamp’ов и оверхеда на re-ordering. Стратегия — в §1.8. Число партиций топиков office.events/kassa.events нельзя менять без миграции на новый топик: после resize старые ключи попадут в другие партиции и нарушится FIFO.