Fibric. Docs fibric.io →
v0.9 · preview
Build

Emitting events

A connector's second job, after providing tools, is turning what its source system says into EventEnvelopes: the one canonical event that a Shopify webhook, an MQTT temperature reading, and a cron tick all become. This page is the field-level guide for connector authors: what you must set, how to reference entities so single-flight works downstream, how to keep duplicates out with dedupe keys, and how to batch, backfill, and reason about ordering. Field names here match packages/kernel/src/envelope.ts exactly.

From source payload to envelope

The path is short and the division of labor is fixed. Your connector declares its streams in the def's events record and maps each source payload to the envelope fields it owns; the platform stamps identity and tenancy and puts the result on the bus, where the router matches it against operator triggers.

@fibric/kernel — EventEnvelope
export interface EventEnvelope {
  event_id: string;
  reseller_id: string | null; // null = Fibric-direct. Present on EVERY envelope.
  tenant_id: string;
  workspace_id: string | null;
  source: string;             // "shopify" | "bacnet-gw-7" | "cron" | "operator:jenny" | ...
  event_type: string;         // "order.created" | "hvac.zone.fault" | ...
  correlation_id: string;
  payload: Record<string, unknown>;
  agent_id: string | null;
  session_id: string | null;
}

Ingestion never acts. An envelope can only cause an operator to propose a plan, and the deterministic executor disposes that plan under policy. Emit freely; the governance boundary is downstream of you.

Declaring event streams

Each entry in the def's events record declares one stream, keyed by the event_type it emits:

events in the def
events: {
  'order.created':   { kind: 'webhook', topic: 'orders/create' },
  'order.updated':   { kind: 'webhook', topic: 'orders/updated' },
  'inventory.level': { kind: 'poll' },                 // no webhook offered upstream
},
FieldTypeRequiredDescription
keystringyesThe event_type this stream emits, in dotted noun.verb form. Operators match triggers (order.*) against it.
kind'webhook' | 'poll'yeswebhook: the source pushes; the platform provisions an ingest URL per connection and verifies signatures. poll: the platform schedules pulls; the shape for hardware and APIs without webhooks.
topicstringnoThe upstream subscription name: a webhook topic, an MQTT topic, a queue. Documentation for connection setup, and what the platform subscribes to where it can do so automatically.

Envelope fields you control

Of the ten envelope fields, four are stamped by the platform and are not yours to set; six are yours. Getting the split right is most of this page.

FieldWho sets itGuidance
event_idplatformAssigned at ingest. Never supply one; supply a dedupe key instead.
reseller_id, tenant_id, workspace_idplatformStamped from the connection. A connector cannot emit into another tenant; isolation is structural, not a field you fill in correctly.
sourceyouYour connector id, or a per-device identifier for hardware fan-in: cn-magento, bacnet-gw-7. Stable, lowercase, meaningful in a receipt.
event_typeyouMust be one of the types declared in your def's events record. Emitting an undeclared type is rejected at ingest.
correlation_idyou, when continuing a threadOmit for a fresh observation; the platform generates one. Set it when the event continues existing work, for example a delivery confirmation for an action an operator took.
payloadyouThe observation, normalized. See below.
agent_id, session_idyou, operators onlyNull for external observations. Operator packs set them so their own output is attributable in the stream.

For payload, normalize rather than forward. Emit the fields an operator can reason over, under the names the capability vocabulary already uses, and keep the vendor's raw blob out or under a single raw key if downstream debugging truly needs it. A payload is not a place to smuggle a second schema.

mapping a webhook body
// what the platform asks your connector for, per declared stream:
// a pure mapping from the source payload to the envelope fields you own.
export function mapOrderCreated(hook: MagentoOrderHook) {
  return {
    source: 'cn-magento',
    event_type: 'order.created',
    dedupe_key: `magento:order:${hook.entity_id}:created`,   // see below
    payload: {
      order_id: String(hook.increment_id),
      entity: { kind: 'order', id: String(hook.increment_id) },
      total: Number(hook.grand_total),
      currency: hook.order_currency_code,
      customer_email: hook.customer_email,
      placed_at: hook.created_at,
    },
  };
}

Entity references

Downstream, the executor serializes side effects per entity using each planned action's entity_key, and operators build those keys from what your payload tells them. The convention that makes this work is an explicit entity reference in every payload:

the entity convention
payload: {
  entity: { kind: 'order', id: 'SO-11290' },   // what this event is ABOUT
  // ...the rest of the observation
}

// an operator sensing this event derives, mechanically:
//   entity_key:      'order:SO-11290'
//   idempotency_key: 'order-risk:order:SO-11290:hold'

Skipping the entity reference does not break ingest, but it breaks the thing that matters: an operator that cannot identify the entity cannot construct a correct entity_key, and single-flight degrades from per-entity to nothing.

Dedupe keys

Sources are unreliable in one specific way: they deliver twice. Webhooks retry, pollers overlap, gateways reconnect and replay. The envelope bus dedupes at ingest on a dedupe_key you supply with each emission; two emissions with the same key within the dedupe window collapse into one envelope, and the second is acknowledged but not re-routed.

dedupe key construction
// source : entity kind : entity id : what happened [ : version discriminator ]
dedupe_key: `magento:order:${id}:created`            // creation happens once
dedupe_key: `magento:order:${id}:updated:${hook.updated_at}`  // updates recur; discriminate
dedupe_key: `bacnet-gw-7:zone:12:reading:${ts_bucket}`        // sensor sample per interval
RuleWhy
Derive the key from the source's own identifiers, never from a hash of the whole payload.Vendors reorder JSON keys and add fields between retries; a payload hash makes retries look like new events.
For recurring event types, include a version discriminator: the source's updated-at, revision number, or sample bucket.Without one, the second legitimate update dedupes against the first and is lost.
Do not include your own timestamps or random values.They defeat dedupe entirely; every retry becomes a fresh key.
i
Two dedupe layers, on purpose

Ingest dedupe (your dedupe_key) keeps duplicate observations off the bus. Executor dedupe (the plan's idempotency_key) keeps duplicate side effects out of the world. They are independent, and the second still protects you when the first misses, which is exactly the layering that made the 657-message flood structurally impossible to repeat.

Batching

Poll-kind streams and busy webhooks produce bursts. Emit them as a batch, not a loop of single publishes: a batch is atomic at ingest (all envelopes accepted or none), dedupe applies per element, and it counts once against connection throughput rather than per element.

batch emission from a poll
// a poll handler returns the batch; the platform ingests it atomically
export async function pollInventory(ctx: ConnectorCtx) {
  const levels = await fetchLevels(ctx);            // one upstream call
  return levels.map((l) => ({
    source: 'cn-magento',
    event_type: 'inventory.level',
    dedupe_key: `magento:sku:${l.sku}:level:${l.as_of}`,
    payload: { entity: { kind: 'sku', id: l.sku }, qty: l.qty, as_of: l.as_of },
  }));
}

Backfill

When a connection is first made, or repaired after an outage, the tenant usually wants history. Backfill is the same mapping run over the source's list APIs, with two switches that make it safe:

CLI
# replay two weeks of source history through the connector's mapping
$ fibric connectors backfill cn-magento --connection magento-live \
    --stream order.created --since 2026-06-18 --mode quiet
  fetched 3,412 records  ·  emitted 3,412 envelopes (dedupe skipped 0)
  mode quiet: envelopes stored and queryable; operators were not triggered
ModeBehaviorUse when
quietEnvelopes land in the store and are queryable, but the router does not trigger operators.Seeding history. You rarely want an operator reacting today to an order from June.
liveEnvelopes route normally.Closing a short gap after an outage, where the reactions are still wanted.

Because your dedupe keys are derived from source identifiers, backfill is naturally re-runnable: a second pass over the same window emits nothing new. This is the property to test before shipping a connector; fibric dev replay --twice exercises it locally, see fixture replay.

Ordering guidance

The bus preserves emission order per connection and per batch, but sources do not: webhook retries arrive late, pollers see state after several changes collapsed, and two gateways racing emit interleaved. Design for it rather than around it:

Keep going