Skip to content

Fleet Telematics Integration Architecture rewrite, Teltonika Integration rewrite#2571

Draft
pankalog wants to merge 26 commits into
masterfrom
feature/teltonika
Draft

Fleet Telematics Integration Architecture rewrite, Teltonika Integration rewrite#2571
pankalog wants to merge 26 commits into
masterfrom
feature/teltonika

Conversation

@pankalog
Copy link
Copy Markdown
Member

@pankalog pankalog commented Mar 1, 2026

Important

This PR will not be merged into the main repository, but will be eventually converted into an extension. the PR is here as it allows for testing, test deployment, and easy review by reviewers, and since all the code is backend-based, it can be trivially converted into an extension.

Tip

You can find various diagrams and textual descriptions at the end of the PR that may help you understand the overall function of the code.

Description

This PR contains the code for the ground-up rewrite of the Telematics integration of OpenRemote.

After years of using the current version of the Teltonika integration in openremote/fleet-management, a few problems were identified:

  1. Too rigid: the implementation was essentially a long MQTT handler class; no adjustability, no option to use the same logic in different ingress points, etc.
  2. Inextensible: There was no real architectural work done to allow the integration of different telematics hardware providers without essentially rewriting all of the business logic. This severely restricts our opportunities for expanding into the support other hardware providers.
  3. Type-unsafe: There essentially is no type safety in the current version. Values that come in from MQTT essentially are opportunistically coerced into a number or a String. No in between, no hex support, nothing. This in turn doesn't allow proper use of the attributes' values in the rest of OpenRemote (think rules, maps, Insights, etc.)
  4. Unstable: Various conditions (especially revolving around the Teltonika Configuration assets) could lead to an instance being abruptly stopped, or could even lead to configuration assets being deleted, which would essentially require accessing and modifying the database directly to return back to an operating state.

Because of these issues, a decision was made to rewrite and gradually deploy a new version of the Telematics integration to make it adhere to proper system design principles.

With a few short words: There are various ingress routes for tracker data to flow inwards, but there's only one way to actually push that data to OpenRemote. To parse messages from devices, we extend the ValueDescriptor to include a parser, which takes in a ByteBuffer and a length, and allows the developer to define exactly how the parameter is parsed.

Commentary and Supporting Diagrams

The important architectural decision is that MQTT, TCP, and UDP are no longer "special systems" with separate asset update logic; they are just different ways of obtaining a DeviceMessage. Once a DeviceMessage exists, the message is handed to TelematicsService and processed through the same queue-backed flow as other telematics input. In practice this means transport classes decode and submit, while create/update/apply behavior is centralized in the message handler/mapping layer.

At model level, the central representation is DeviceMessage (device identity + typed attribute list), and Teltonika semantics are captured by TeltonikaParameter + TeltonikaRegistry + TeltonikaAttributeResolver. This keeps protocol parsing details local to Teltonika metadata but avoids leaking transport details into asset processing. It also preserves strict parse visibility: malformed data still fails at decode/resolve boundaries with explicit exceptions, rather than being silently swallowed or transformed into ambiguous states.

A key part of the PR is mapping consistency between protocol-level fields and OpenRemote-level asset attributes. Known Teltonika parameters are resolved first through TeltonikaTrackerAsset descriptors; this ensures stable, platform-facing attribute names for mapped parameters. For known-but-unmapped parameters, fallback naming is derived from parameter metadata (display-name normalization) rather than opaque numeric IDs; for unknown parameters, deterministic teltonika_ fallback is preserved. This gives us backwards-safe behavior while improving readability and discoverability of newly surfaced parameters.

The mapper layer (TeltonikaAssetMapper) now explicitly handles normalization of well-known OpenRemote attributes (notably location), so platform features like map views work off canonical attributes instead of vendor-specific aliases. Preprocessing now builds a new attribute list from the incoming message (rather than mutating in place), which makes behavior easier to review and reason about: each incoming attribute is either rewritten or passed through, and final normalized output is explicit.

Agent-side changes align with the same direction: the Teltonika agent/protocol stack can ingest TCP/UDP (including dual mode), decode records, and submit to TelematicsService via a publisher interface instead of applying asset updates directly. That avoids a second business-logic path in the agent module and keeps one source of truth for connection state updates, queueing, asset provisioning, and attribute event emission. In other words, the agent is now ingress infrastructure, not a parallel business processor.

Overall, this PR is less about adding one feature and more about reducing architectural drift: one message model, one processing pipeline, one mapping policy, and transport-specific code constrained to decode/ack responsibilities. The net effect should be easier evolution (new codecs/params), less behavioral divergence between transports, and clearer observability when parsing or mapping fails.

Telematics Class Diagram

classDiagram
direction LR

namespace model_telematics_core {
  class DeviceMessage {
    -String deviceId
    -String protocolName
    -List~Attribute~ attributes
    +getDeviceId() String
    +getProtocolName() String
    +getAttributes() List~Attribute~
  }

  class TelematicsMessageEnvelope {
    -String vendorId
    -String deviceId
    -String realm
    -String protocolId
    -Transport transport
    -Instant receivedAt
    -DeviceMessage message
  }

  class TelematicsMessageHandler {
    <<interface>>
    +getVendorId() String
    +process(envelope, connection) Optional~String~
  }

  class TelematicsMessagePublisher {
    <<interface>>
    +submitMessage(vendorId, realm, transport, codecId, message) void
  }
}

namespace model_telematics_protocol {
  class DeviceProtocol {
    <<interface>>
    +getProtocolId() String
    +canHandle(data, context) boolean
    +decode(data, context) List~DeviceMessage~
    +encodeCommand(command, context) Optional~ByteBuf~
  }

  class DeviceCommand
  class MessageContext
  class ProtocolDecodeException
  class ProtocolEncodeException
}

namespace manager_telematics {
  class TelematicsVendor {
    <<interface>>
    +getVendorId() String
    +getProtocol() DeviceProtocol
    +getParameterRegistry() TelematicsParameterRegistry
    +getAssetMapper() TelematicsAssetMapper
    +createMessageHandler(...) TelematicsMessageHandler
  }

  class TelematicsService {
    +TELEMATICS_MESSAGE_QUEUE String
    +registerVendor(vendor) void
    +markTrackerConnected(...) DeviceConnection
    +submitMessage(vendorId, realm, transport, codecId, message) void
    -processEnvelope(envelope) void
  }

  class TeltonikaVendor
  class TeltonikaMessageHandler
  class TeltonikaMqttJsonProtocol
  class TeltonikaTransportProtocol
  class TeltonikaCommandMapper
  class TeltonikaSessionManager
  class TeltonikaMQTTHandler
}

namespace model_telematics_teltonika {
  class TeltonikaParameter~T~ {
    +getId() String
    +getDisplayName() Optional~String~
    +builder() Builder
    +builder(valueDescriptor, parser) Builder
  }

  class TeltonikaParameters {
    <<catalog>>
    +TIMESTAMP TeltonikaParameter
    +EYE_MOVEMENT_COUNT_1 TeltonikaParameter
    +... many more ...
  }

  class TeltonikaRegistry {
    <<singleton>>
    +getById(id) Optional~TeltonikaParameter~
    +getByFullName(name) Optional~TeltonikaParameter~
    +findMatchingAttributeDescriptor(assetClass, valueDescriptor) Optional~AttributeDescriptor~
  }

  class TeltonikaAttributeResolver {
    +resolveBinaryIo(parameterId, value, timestamp) Attribute
    +resolveJson(parameterId, rawValue, timestamp) Attribute
  }

  class TeltonikaAssetMapper {
    +createAsset(imei, realm) TeltonikaTrackerAsset
    +generateAssetId(deviceId) String
    +applyAttributes(asset, message) List~Attribute~
    +preProcessAttributes(asset, message) List~Attribute~
  }

  class TeltonikaTrackerAsset
}

namespace agent_teltonika {
  class TeltonikaAgent
  class TeltonikaProtocol
  class TeltonikaTcpServer
  class TeltonikaUdpServer
  class TeltonikaProtocolDecoder
  class TeltonikaRecord
}

class AssetStorageService
class AssetProcessingService
class DeviceConnection
class DeviceSessionManager
class TelematicsParameterRegistry~T~
class TelematicsAssetMapper~A~
class Attribute
class AttributeDescriptor
class ValueDescriptor
class Agent~T,U,V~
class DefaultAgentLink
class AbstractProtocol~A,L~
class AbstractTCPServer~M~
class AbstractUDPServer~M~
class ByteToMessageDecoder
class ByteBuf
class ContainerService

TelematicsService ..|> ContainerService
TelematicsService ..|> TelematicsMessagePublisher
TelematicsService --> TelematicsVendor
TelematicsService --> TelematicsMessageHandler
TelematicsService --> TelematicsMessageEnvelope
TelematicsService --> DeviceConnection
TelematicsService --> AssetStorageService
TelematicsService --> AssetProcessingService

TelematicsVendor --> DeviceProtocol
TelematicsVendor --> TelematicsParameterRegistry~T~
TelematicsVendor --> TelematicsAssetMapper~A~
TelematicsVendor --> DeviceSessionManager
TelematicsVendor --> TelematicsMessageHandler

TeltonikaVendor ..|> TelematicsVendor
TeltonikaVendor --> TeltonikaRegistry
TeltonikaVendor --> TeltonikaAssetMapper
TeltonikaVendor --> TeltonikaCommandMapper
TeltonikaVendor --> TeltonikaMqttJsonProtocol
TeltonikaVendor --> TeltonikaTransportProtocol
TeltonikaVendor --> TeltonikaSessionManager
TeltonikaVendor --> TeltonikaMessageHandler

TeltonikaMqttJsonProtocol ..|> DeviceProtocol
TeltonikaTransportProtocol ..|> DeviceProtocol
TeltonikaMqttJsonProtocol --> TeltonikaAttributeResolver
TeltonikaMqttJsonProtocol --> DeviceMessage
TeltonikaMqttJsonProtocol --> MessageContext
TeltonikaMqttJsonProtocol --> ByteBuf
TeltonikaMqttJsonProtocol --> ProtocolDecodeException
TeltonikaMqttJsonProtocol --> ProtocolEncodeException

TeltonikaMQTTHandler --> TeltonikaVendor
TeltonikaMQTTHandler --> TelematicsService
TeltonikaMQTTHandler --> DeviceProtocol
TeltonikaMQTTHandler --> MessageContext

TeltonikaMessageHandler ..|> TelematicsMessageHandler
TeltonikaMessageHandler --> TeltonikaAssetMapper
TeltonikaMessageHandler --> TeltonikaTrackerAsset
TeltonikaMessageHandler --> DeviceMessage
TeltonikaMessageHandler --> DeviceConnection

TeltonikaRegistry ..|> TelematicsParameterRegistry~T~
TeltonikaRegistry --> TeltonikaParameters
TeltonikaRegistry --> TeltonikaParameter
TeltonikaRegistry --> AttributeDescriptor

TeltonikaParameter --|> ValueDescriptor
TeltonikaParameters --> TeltonikaParameter
TeltonikaAttributeResolver --> TeltonikaRegistry
TeltonikaAttributeResolver --> TeltonikaParameter
TeltonikaAttributeResolver --> TeltonikaTrackerAsset
TeltonikaAttributeResolver --> Attribute

TeltonikaAssetMapper ..|> TelematicsAssetMapper~A~
TeltonikaAssetMapper --> TeltonikaTrackerAsset
TeltonikaAssetMapper --> DeviceMessage
TeltonikaAssetMapper --> Attribute

TeltonikaAgent --|> Agent~T,U,V~
TeltonikaAgent --> TeltonikaProtocol

TeltonikaProtocol --|> AbstractProtocol~A,L~
TeltonikaProtocol --> TeltonikaTcpServer
TeltonikaProtocol --> TeltonikaUdpServer
TeltonikaProtocol --> TelematicsMessagePublisher
TeltonikaProtocol --> TeltonikaRecord
TeltonikaProtocol --> DeviceMessage

TeltonikaTcpServer --|> AbstractTCPServer~M~
TeltonikaUdpServer --|> AbstractUDPServer~M~
TeltonikaTcpServer --> TeltonikaProtocolDecoder
TeltonikaUdpServer --> TeltonikaProtocolDecoder

TeltonikaProtocolDecoder --|> ByteToMessageDecoder
TeltonikaProtocolDecoder --> TeltonikaRecord
TeltonikaProtocolDecoder --> TeltonikaAttributeResolver
TeltonikaProtocolDecoder --> TeltonikaRegistry
TeltonikaProtocolDecoder --> ByteBuf
Loading

Telematics Sequence Diagrams

End-to-End Ingestion (MQTT, TCP, UDP)

sequenceDiagram
    autonumber

    box rgba(232,245,255,0.6) Ingestion Sources
      participant DeviceMQTT as Teltonika Device (MQTT)
      participant DeviceTCP as Teltonika Device (TCP)
      participant DeviceUDP as Teltonika Device (UDP)
    end

    box rgba(233,255,239,0.6) Ingress Layer
      participant MQTTHandler as TeltonikaMQTTHandler
      participant AgentProtocol as TeltonikaProtocol (Agent)
      participant Decoder as TeltonikaProtocolDecoder
      participant DeviceProtocol as DeviceProtocol (Teltonika)
    end

    box rgba(255,248,232,0.6) Normalized Messaging
      participant Publisher as TelematicsMessagePublisher
      participant Service as TelematicsService
      participant Queue as SEDA telematics-device-messages
      participant Envelope as TelematicsMessageEnvelope
    end

    box rgba(255,234,242,0.6) Processing Layer
      participant Handler as TeltonikaMessageHandler
      participant Mapper as TeltonikaAssetMapper
      participant Resolver as TeltonikaAttributeResolver
      participant Registry as TeltonikaRegistry
      participant Store as AssetStorageService
      participant Processing as AssetProcessingService
    end

    alt MQTT path
      DeviceMQTT->>MQTTHandler: publish JSON on realm/client/teltonika/{imei}/data
      MQTTHandler->>DeviceProtocol: decode(ByteBuf, MessageContext)
      DeviceProtocol->>Resolver: resolveJson(paramId, rawValue, ts)
      Resolver->>Registry: lookup TeltonikaParameter + matching descriptor
      Registry-->>Resolver: known/unknown mapping metadata
      Resolver-->>DeviceProtocol: Attribute list entries
      DeviceProtocol-->>MQTTHandler: List<DeviceMessage>
      MQTTHandler->>Service: submitMessage(vendorId, realm, MQTT, codec, message)
    end

    alt TCP path
      DeviceTCP->>AgentProtocol: send identification/AVL packet
      AgentProtocol->>Decoder: decode TCP frame
      Decoder->>Resolver: resolveBinaryIo(avlId, bytes, ts)
      Resolver->>Registry: parameter/descriptor lookup
      Registry-->>Resolver: mapping info
      Resolver-->>Decoder: typed attributes
      Decoder-->>AgentProtocol: TeltonikaRecord(s)
      AgentProtocol->>AgentProtocol: toDeviceMessage(record)
      AgentProtocol->>Publisher: submitMessage(vendorId, realm, TCP, codec, message)
      Publisher->>Service: submitMessage(...)
    end

    alt UDP path
      DeviceUDP->>AgentProtocol: send UDP AVL packet
      AgentProtocol->>Decoder: decode UDP frame
      Decoder-->>DeviceUDP: UDP ACK
      Decoder-->>AgentProtocol: TeltonikaRecord(s)
      AgentProtocol->>AgentProtocol: toDeviceMessage(record)
      AgentProtocol->>Publisher: submitMessage(vendorId, realm, UDP, codec, message)
      Publisher->>Service: submitMessage(...)
    end

    Service->>Service: markTrackerConnected / update DeviceConnection
    Service->>Envelope: build envelope(vendor,device,realm,protocol,transport,message)
    Service->>Queue: enqueue envelope

    Queue->>Service: dequeue envelope
    Service->>Handler: process(envelope, connection)
    Handler->>Mapper: getOrCreateAsset + applyAttributes
    Mapper->>Mapper: preProcessAttributes (normalize well-known fields)
    Handler->>Store: merge asset
    Handler->>Processing: send attribute events
    Handler-->>Service: Optional<assetId>
    Service->>Service: update connection assetId
Loading

Key Mapping Resolution Flow

sequenceDiagram
    autonumber
    participant Protocol as TeltonikaMqttJsonProtocol / Decoder
    participant Resolver as TeltonikaAttributeResolver
    participant Registry as TeltonikaRegistry
    participant Tracker as TeltonikaTrackerAsset metadata

    Protocol->>Resolver: resolveJson/resolveBinaryIo(parameterId, raw, ts)
    Resolver->>Registry: getById/getByFullName(parameterId)

    alt Known TeltonikaParameter
      Registry-->>Resolver: descriptor metadata (id, type, parser, displayName)
      Resolver->>Registry: findMatchingAttributeDescriptor(TeltonikaTrackerAsset, parameter)

      alt Descriptor exists on tracker asset
        Registry-->>Resolver: AttributeDescriptor match
        Resolver-->>Protocol: Attribute(name = tracker descriptor name, typed value)
      else Descriptor missing
        Registry-->>Resolver: no match
        Resolver-->>Protocol: Attribute(name = normalized displayName, typed value)
      end
    else Unknown parameter
      Registry-->>Resolver: empty
      Resolver-->>Protocol: Attribute(name = teltonika_{id}, inferred type/raw)
    end
Loading

…etc.

Needs further work in the attribute generation and parsing. More AVL IDs should also be added from other trackers, and also add the parsing logic from the old version if the attribute is unknown.
# Conflicts:
#	manager/src/map/mapdata.mbtiles
#	ui/component/or-vaadin-components/src/or-vaadin-number-field.ts
#	ui/component/or-vaadin-components/src/or-vaadin-password-field.ts
#	ui/component/or-vaadin-components/src/or-vaadin-text-area.ts
#	ui/component/or-vaadin-components/src/or-vaadin-text-field.ts
#	ui/component/or-vaadin-components/stories/or-vaadin-text-field.stories.ts
@pankalog pankalog requested review from a team and Copilot March 1, 2026 14:36
@pankalog pankalog linked an issue Mar 1, 2026 that may be closed by this pull request
4 tasks
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Ground-up rewrite of the telematics integration to introduce a unified, vendor-driven ingestion/processing pipeline (MQTT/TCP/UDP all produce DeviceMessage and flow through TelematicsService), with a full Teltonika implementation (registry/descriptor parsing, asset mapping, agent + Netty decoder) and accompanying tests.

Changes:

  • Introduces core telematics model abstractions (DeviceMessage, envelope/handler/publisher, protocol + parameter parsing APIs) and centralizes processing in TelematicsService (queue-backed).
  • Adds Teltonika vendor implementation across model/manager/agent layers (parameter registry + resolver, tracker asset, asset mapper, MQTT handler, TCP/UDP servers + protocol decoder).
  • Adds extensive integration/decoder/MQTT command tests and updates build/service registration wiring.

Reviewed changes

Copilot reviewed 45 out of 46 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
ui/app/swagger/build.gradle Adjusts swagger UI resource generation task duplicate handling.
test/src/test/groovy/org/openremote/test/protocol/mqtt/TeltonikaMQTTClientProtocolTest.groovy Adds large end-to-end and decoder tests for Teltonika MQTT/TCP/UDP flows.
test/src/test/groovy/org/openremote/manager/mqtt/TeltonikaMQTTHandlerCommandTest.groovy Adds unit + end-to-end tests for Teltonika MQTT command and ingest processing.
model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaTrackerAsset.java Defines Teltonika tracker asset and attribute descriptors.
model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaRegistry.java Implements Teltonika parameter registry + dynamic parameter creation + descriptor matching.
model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaParameter.java Defines Teltonika parseable parameter descriptor/builder with AVL metadata.
model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaAttributeResolver.java Resolves Teltonika parameters into typed OpenRemote attributes for JSON/binary.
model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaAssetMapper.java Creates/provisions Teltonika assets and normalizes well-known attributes.
model/src/main/java/org/openremote/model/telematics/session/DeviceSessionManager.java Adds device session manager abstraction.
model/src/main/java/org/openremote/model/telematics/session/DeviceSession.java Adds device session abstraction/state model.
model/src/main/java/org/openremote/model/telematics/session/DeviceConnection.java Adds runtime device connection state tracking.
model/src/main/java/org/openremote/model/telematics/protocol/ProtocolEncodeException.java Adds protocol encode exception type.
model/src/main/java/org/openremote/model/telematics/protocol/ProtocolDecodeException.java Adds protocol decode exception type.
model/src/main/java/org/openremote/model/telematics/protocol/MessageContext.java Adds mutable per-message/per-connection context (transport, deviceId, etc.).
model/src/main/java/org/openremote/model/telematics/protocol/DeviceProtocol.java Defines vendor protocol API for decode/encode/acks/identification.
model/src/main/java/org/openremote/model/telematics/protocol/DeviceCommand.java Adds a simple device command wrapper.
model/src/main/java/org/openremote/model/telematics/parameter/TelematicsParameterRegistry.java Defines vendor parameter registry API (known + dynamic).
model/src/main/java/org/openremote/model/telematics/parameter/StandardParsers.java Adds standard binary parsers/inference helpers.
model/src/main/java/org/openremote/model/telematics/parameter/ParseableValueDescriptor.java Extends ValueDescriptor with parsing capability + helpers.
model/src/main/java/org/openremote/model/telematics/parameter/ParameterRegistry.java Adds an additional registry interface (generic descriptor lookup).
model/src/main/java/org/openremote/model/telematics/parameter/ParameterParser.java Adds parser strategy interface for byte buffer decoding.
model/src/main/java/org/openremote/model/telematics/parameter/ParameterParseException.java Adds parse exception type for parameter parsing failures.
model/src/main/java/org/openremote/model/telematics/parameter/DynamicParameter.java Adds dynamic (unknown-ID) parameter descriptor generation.
model/src/main/java/org/openremote/model/telematics/core/TelematicsMessagePublisher.java Adds publisher interface for submitting decoded messages to the pipeline.
model/src/main/java/org/openremote/model/telematics/core/TelematicsMessageHandler.java Adds handler interface for vendor-specific asset update processing.
model/src/main/java/org/openremote/model/telematics/core/TelematicsMessageEnvelope.java Adds immutable queue envelope for message processing.
model/src/main/java/org/openremote/model/telematics/core/DeviceMessage.java Adds normalized message model (deviceId + protocol + attribute list).
model/src/main/java/org/openremote/model/telematics/asset/TelematicsAssetMapper.java Adds vendor asset mapper API for provisioning and attribute application.
model/src/main/java/org/openremote/model/geo/GeoJSONPoint.java Adds equals/hashCode override for GeoJSON point equality.
model/src/main/java/org/openremote/model/geo/GeoJSON.java Adds equals/hashCode base implementation.
model/src/main/java/org/openremote/model/StandardModelProvider.java Registers TeltonikaParameters as a model descriptor provider.
model/build.gradle Adds Netty buffer dependency to model module.
manager/src/main/resources/META-INF/services/org.openremote.model.ContainerService Registers TelematicsService as a container service.
manager/src/main/resources/META-INF/services/org.openremote.manager.mqtt.MQTTHandler Registers TeltonikaMQTTHandler as an MQTT handler.
manager/src/main/java/org/openremote/manager/telematics/TeltonikaVendor.java Implements Teltonika vendor, protocols, command mapping, and session manager.
manager/src/main/java/org/openremote/manager/telematics/TeltonikaMessageHandler.java Implements vendor message handler that provisions/updates assets and emits events.
manager/src/main/java/org/openremote/manager/telematics/TeltonikaMQTTHandler.java Implements MQTT ingress handler for Teltonika device topics and command publishing.
manager/src/main/java/org/openremote/manager/telematics/TelematicsVendor.java Adds vendor interface describing protocols/registries/mappers/handlers.
manager/src/main/java/org/openremote/manager/telematics/TelematicsService.java Adds central telematics queue-backed processing + connection tracking.
agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaUdpServer.java Adds UDP server wrapper wiring Teltonika decoder.
agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaTcpServer.java Adds TCP server wrapper wiring Teltonika decoder.
agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaRecord.java Adds record model for decoded Teltonika AVL frames.
agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaProtocolDecoder.java Adds Netty decoder for Teltonika AVL codecs (TCP/UDP).
agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaProtocol.java Updates agent protocol to publish decoded messages via TelematicsMessagePublisher.
agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaAgent.java Adds agent asset for running Teltonika TCP/UDP servers.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +61 to +69
List<Attribute<?>> newAttributes = assetMapper.applyAttributes(asset, message);
assetStorageService.merge(asset);

for (Attribute<?> attribute : message.getAttributes()) {
boolean isNew = newAttributes.stream().anyMatch(a -> Objects.equals(a.getName(), attribute.getName()));
if (isNew || attribute.getValue().isEmpty()) {
continue;
}
assetProcessingService.sendAttributeEvent(new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get()));
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attribute events are emitted using message.getAttributes(), but applyAttributes() preprocesses/renames attributes (e.g. gpsLocation -> location) without mutating the DeviceMessage. This can cause events to be published under names that were not actually applied to the asset. Emit events from the pre-processed/applied attribute list instead (and use the same names stored on the asset).

Suggested change
List<Attribute<?>> newAttributes = assetMapper.applyAttributes(asset, message);
assetStorageService.merge(asset);
for (Attribute<?> attribute : message.getAttributes()) {
boolean isNew = newAttributes.stream().anyMatch(a -> Objects.equals(a.getName(), attribute.getName()));
if (isNew || attribute.getValue().isEmpty()) {
continue;
}
assetProcessingService.sendAttributeEvent(new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get()));
List<Attribute<?>> appliedAttributes = assetMapper.applyAttributes(asset, message);
assetStorageService.merge(asset);
for (Attribute<?> attribute : appliedAttributes) {
if (attribute.getValue().isEmpty()) {
continue;
}
assetProcessingService.sendAttributeEvent(
new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get())
);

Copilot uses AI. Check for mistakes.
Comment on lines +96 to +98
public void incrementMessageCount() {
this.messageCount++;
}
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messageCount++ is not atomic; with concurrentConsumers=4 on the telematics queue, multiple threads can update the same DeviceConnection concurrently and lose increments. Use an AtomicLong (or LongAdder) or synchronize updates so message counts remain accurate.

Copilot uses AI. Check for mistakes.
Comment on lines +104 to +106
public void incrementConnectionCount() {
this.connectionCount++;
}
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connectionCount++ is not atomic; if connection state updates happen concurrently for the same device, increments can be lost. Use an atomic counter (or synchronize) to keep connection counts reliable.

Copilot uses AI. Check for mistakes.
Comment thread ui/app/swagger/build.gradle Outdated
}

tasks.register('generateSources', Copy) {
duplicatesStrategy = 'include'
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this repo other Copy/Sync tasks set duplicatesStrategy using the DuplicatesStrategy enum (e.g. DuplicatesStrategy.EXCLUDE). Setting it to the string 'include' is inconsistent and may not be coerced correctly by Gradle; use DuplicatesStrategy.INCLUDE (and add the import if needed).

Copilot uses AI. Check for mistakes.
Comment on lines +330 to +333
cleanup:
channel.finish()
container.stop()
}
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup block calls container.stop(), but this test method doesn't define a container variable (only decoder/channel/buffer). This will throw at runtime during cleanup; remove that call or create/stop a container in this test.

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +93
if (buf.readableBytes() < 2) {
return; // Need more data
}

// Check for ping message
if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == 0xFF) {
buf.readByte();
return; // Ignore ping
}

Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decodeTcp returns early when readableBytes() < 2, which makes the subsequent ping handling for a single 0xFF byte unreachable. Handle the 1-byte ping case before the < 2 guard (or adjust the guard) so the ping message can be consumed as intended.

Suggested change
if (buf.readableBytes() < 2) {
return; // Need more data
}
// Check for ping message
if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == 0xFF) {
buf.readByte();
return; // Ignore ping
}
// Check for ping message (single 0xFF byte)
if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == (byte) 0xFF) {
buf.readByte();
return; // Ignore ping
}
if (buf.readableBytes() < 2) {
return; // Need more data
}

Copilot uses AI. Check for mistakes.
Comment on lines +189 to +194
dataLength = buf.readUnsignedInt();
dataStartIndex = buf.readerIndex();

if (buf.readableBytes() < dataLength + 4) {
throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength +
" plus CRC, but only " + buf.readableBytes() + " bytes available");
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For TCP, ByteToMessageDecoder can be called with partial frames; throwing when buf.readableBytes() < dataLength + 4 will break decoding on normal TCP fragmentation. Instead, reset the reader index (or avoid advancing it) and return until the full frame (payload + CRC) is available.

Suggested change
dataLength = buf.readUnsignedInt();
dataStartIndex = buf.readerIndex();
if (buf.readableBytes() < dataLength + 4) {
throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength +
" plus CRC, but only " + buf.readableBytes() + " bytes available");
// Mark the current reader index so we can roll back if the full frame is not yet available
buf.markReaderIndex();
dataLength = buf.readUnsignedInt();
// Start of actual data (after the 4-byte length field)
dataStartIndex = buf.readerIndex();
// Ensure the full frame (data + 4-byte CRC) is available before proceeding
if (buf.readableBytes() < dataLength + 4) {
// Not enough data yet; reset to the marked position and wait for more bytes
buf.resetReaderIndex();
return null;

Copilot uses AI. Check for mistakes.
@MartinaeyNL
Copy link
Copy Markdown
Member

As it should not be merged to master, we'd better make it a draft right?
Therefore it cannot be merged by someone accidentally.

Copy link
Copy Markdown
Member

@MartinaeyNL MartinaeyNL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this was a very lengthy and exhausting PR to take a look at 😰
Please don't do this to us again, thanks 😅

I want to note that this definitely needs more eyes to get proper feedback on this.
I'm in no way familiar with the meaning behind this, as there is no linked issue that explains
why all this code needs to end up in the main repository. Even though it might become an extension,
it is still a lot of code and a lot of boilerplate.

I would think a lot of this code can be simplified.
It feels very complex for something that should be easy and accessible for other developers.
Don't know what you think about that @pankalog ?

Code-wise only have a few minor remarks on code quality / practices.
Please take a look at the Copilot comments as well.

@@ -0,0 +1,1044 @@
/*
* Copyright 2019, OpenRemote Inc.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the copyright headers to 2026

@TsIgnore
@ModelDescriptor(assetType = Asset.class, provider = MetaItemType.class)
@ModelDescriptor(assetType = Asset.class, provider = ValueType.class)
@ModelDescriptor(assetType = Asset.class, provider = TeltonikaParameters.class)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks odd to me

Comment on lines +12 to +15
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import java.util.stream.Stream;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either use a wildcard or specific classes.
Do you use some kind of static analysis tools to prevent these issues?

@@ -0,0 +1,243 @@
package org.openremote.model.telematics.teltonika;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing copyright header

return new Attribute<>(name, ValueType.TEXT, value != null ? value.toString() : null, timestamp);
}

private ByteBuf convertJsonValueToBinary(Object value, ParseableValueDescriptor<?> valueDescriptor) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a utility function we might already have.
And if not, we should about moving this outside of the telematics model.

return new Attribute<>(name, ValueType.TEXT, hexValue, timestamp);
}

private Attribute<?> createRawJsonAttribute(String parameterId, Object rawValue, long timestamp) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a utility function we might already have.
And if not, we should about moving this outside of the telematics model.

return VENDOR_PREFIX + normalizedId;
}

private Attribute<?> createRawAttribute(String parameterId, ByteBuf value, int length, long timestamp) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this into a utility function, instead of having it in the telematics-specific logic?

return new Attribute<>(name, ValueType.TEXT, normalized != null ? normalized.toString() : null, timestamp);
}

private Attribute<?> createFallbackTypedAttribute(String name, Object value, long timestamp) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a function we'd already have, or can be simplified,
instead of parsing all the object types manually like this.

Comment on lines +80 to +82



Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of unnecessary whitespace

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 44 out of 45 changed files in this pull request and generated 15 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +84 to +93
if (buf.readableBytes() < 2) {
return; // Need more data
}

// Check for ping message
if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == 0xFF) {
buf.readByte();
return; // Ignore ping
}

Comment on lines +193 to +194
throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength +
" plus CRC, but only " + buf.readableBytes() + " bytes available");
Comment on lines +85 to +89
return canPublish(connection, securityContext, topic);
}

@Override
public boolean canPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
Comment on lines +124 to +145
public void sendCommand(String realm, String clientId, String imei, DeviceCommand command) {
String topic = getCommandTopic(realm, clientId, imei);
MessageContext context = new MessageContext(MessageContext.Transport.MQTT)
.setRealm(realm)
.setDeviceId(imei)
.setCodecName(TeltonikaVendor.CODEC_JSON);

try {
ByteBuf encoded = mqttProtocol.encodeCommand(command, context)
.orElseThrow(() -> new ProtocolEncodeException(mqttProtocol.getProtocolId(), command.getCommand(), "Protocol returned empty payload"));
try {
byte[] bytes = new byte[encoded.readableBytes()];
encoded.getBytes(encoded.readerIndex(), bytes);
Object jsonPayload = ValueUtil.JSON.readTree(bytes);
publishMessage(topic, jsonPayload, MqttQoS.AT_LEAST_ONCE);
} finally {
encoded.release();
}
} catch (ProtocolEncodeException | IOException e) {
getLogger().warning("Failed to send Teltonika command to " + imei + ": " + e.getMessage());
}
}
public void stop(Container container) {
LOG.info("Stopping TelematicsService: vendors=" + vendorMap.size() + ", devices=" + deviceConnectionMap.size());
deviceConnectionMap.clear();
vendorMap.clear();
Comment on lines +19 to +27
public static final AttributeDescriptor<Integer> PRIORITY = new AttributeDescriptor<>("priority", TeltonikaParameters.PRIORITY).withOptional(true);
public static final AttributeDescriptor<Integer> ALTITUDE = new AttributeDescriptor<>("altitude", TeltonikaParameters.ALTITUDE).withOptional(true);
public static final AttributeDescriptor<Integer> DIRECTION = new AttributeDescriptor<>("direction", TeltonikaParameters.DIRECTION).withOptional(true);
public static final AttributeDescriptor<Integer> SATELLITES = new AttributeDescriptor<>("satellites", TeltonikaParameters.SATELLITES).withOptional(true);
public static final AttributeDescriptor<Integer> SPEED_SATELLITE = new AttributeDescriptor<>("speedSatellite", TeltonikaParameters.SPEED_SATELLITE).withOptional(true);
public static final AttributeDescriptor<Integer> EVENT_TRIGGERED = new AttributeDescriptor<>("eventTriggered", TeltonikaParameters.EVENT_TRIGGERED).withOptional(true);
public static final AttributeDescriptor<Long> TIMESTAMP = new AttributeDescriptor<>("timestamp", ValueType.TIMESTAMP).withOptional(true);
public static final AttributeDescriptor<GeoJSONPoint> GPS_LOCATION = new AttributeDescriptor<>("gpsLocation", TeltonikaParameters.LOCATION).withOptional(true);
public static final AttributeDescriptor<Integer> SPEED = new AttributeDescriptor<>("speed", TeltonikaParameters.SPEED).withOptional(true);
Comment on lines +20 to +45
public interface ParameterRegistry<D extends ParseableValueDescriptor<?>> {

/**
* Get the vendor prefix used for attribute naming (e.g., "teltonika", "queclink").
*
* @return The vendor prefix string
*/
String getVendorPrefix();

/**
* Get a descriptor by its protocol-specific ID (e.g., AVL ID for Teltonika).
*
* @param id The protocol-specific parameter ID
* @return The descriptor if found, or empty if not registered
*/
Optional<D> getById(String id);

/**
* Get a descriptor by its full name (including vendor prefix).
*
* @param name The full attribute name (e.g., "teltonika_239")
* @return The descriptor if found, or empty if not registered
*/
Optional<D> getByName(String name);

/**
Comment on lines +73 to +80
for (Attribute<?> attribute : message.getAttributes()) {
if (TeltonikaTrackerAsset.GPS_LOCATION.getName().equals(attribute.getName())) {
attribute = new Attribute<>(Asset.LOCATION, (GeoJSONPoint) attribute.getValue().get(), attribute.getTimestamp().get());
}
if(TeltonikaParameters.TIMESTAMP.getName().equals(attribute.getName())) {
attribute = new Attribute<>(TeltonikaTrackerAsset.TIMESTAMP, (Long) attribute.getValue().get(), attribute.getTimestamp().get());
}

Comment on lines +112 to +114
default boolean isTimedOut(long timeoutSeconds) {
return getLastSeen().plusSeconds(timeoutSeconds).isBefore(Instant.now());
}
Comment on lines +68 to +80
if (topic.getTokens().length < 5) {
getLogger().warning(MessageFormat.format("Topic {0} is not a valid Topic. Please use a valid Topic.", topic));
return false;
}
try {
Long.parseLong(topic.getTokens()[3]);
} catch (NumberFormatException e) {
getLogger().warning(MessageFormat.format("IMEI {0} is not a valid IMEI value. Please use a valid IMEI value.", topic.getTokens()[3]));
return false;
}
return Objects.equals(topic.getTokens()[2], TELTONIKA_DEVICE_TOKEN)
&& (Objects.equals(topic.getTokens()[4], TELTONIKA_DEVICE_RECEIVE_TOPIC)
|| Objects.equals(topic.getTokens()[4], TELTONIKA_DEVICE_SEND_TOPIC));
@@ -0,0 +1,46 @@
package org.openremote.agent.protocol.teltonika;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add all the missing copyright headers to new files.

Comment on lines +9 to +14
* Each record contains:
* - IMEI (device identifier)
* - Timestamp
* - GPS location
* - Validity flag
* - IO elements (parsed as OpenRemote attributes)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without using <ul> and <li> tags such JavaDocs are hard to read. Please update similar JavaDocs as well.

Image

package org.openremote.agent.protocol.teltonika;

import org.openremote.model.attribute.AttributeMap;
import org.openremote.model.geo.GeoJSONPoint;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused:

Suggested change
import org.openremote.model.geo.GeoJSONPoint;

Comment on lines +228 to +258
public List<DeviceConnection> getTrackerStates() {
return List.copyOf(deviceConnectionMap.values());
}

public List<DeviceConnection> getTrackerStatesByVendor(String vendorId) {
return deviceConnectionMap.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(vendorId + ":"))
.map(Map.Entry::getValue)
.toList();
}

public Optional<DeviceConnection> removeTrackerState(String vendorId, String deviceId) {
return Optional.ofNullable(deviceConnectionMap.remove(toDeviceKey(vendorId, deviceId)));
}

public boolean isDeviceConnected(String vendorId, String deviceId) {
return getTrackerState(vendorId, deviceId)
.map(DeviceConnection::isConnected)
.orElse(false);
}

public int getConnectedDeviceCount() {
int count = 0;
for (DeviceConnection connection : deviceConnectionMap.values()) {
if (connection.isConnected()) {
count++;
}
}
return count;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are there so many unused methods in this class?

Image


Optional<C> fromInboundResponse(M message);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so many unused methods in this class?

Image

Comment on lines +75 to +99
/**
* Called when a message is received from the device.
* Updates last seen time.
*
* @param message The decoded message (list of Attributes)
*/
void onMessage(DeviceMessage message);

/**
* Called when the device connects.
*/
void onConnect();

/**
* Called when the device disconnects.
*/
void onDisconnect();

/**
* Queue a command to send to the device.
*
* @param command The command to send
* @return true if the command was queued successfully
*/
boolean queueCommand(DeviceCommand command);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these methods never called? As a result lastSeen won't update correctly and it will not send commands to devices.

* @param timeoutSeconds Sessions idle longer than this are removed
* @return The number of sessions removed
*/
int cleanupTimedOut(long timeoutSeconds);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is never called. Will this result in sessions that timed out never to be cleaned up causing a memory leak?

/**
* Called when a session receives a message.
*/
void onSessionMessage(DeviceSession session);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's never called.


return dynamicParameters.computeIfAbsent(id, k -> {
LOG.fine("Dynamic parameter: " + id + " (length=" + byteLength + ")");
return (DynamicParameter<?>) DynamicParameter.fromByteLength(getVendorPrefix(), id, byteLength);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant cast:

Suggested change
return (DynamicParameter<?>) DynamicParameter.fromByteLength(getVendorPrefix(), id, byteLength);
return DynamicParameter.fromByteLength(getVendorPrefix(), id, byteLength);

@@ -0,0 +1,255 @@
package org.openremote.manager.mqtt
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test not grouped with the other tests in the org.openremote.test package?

@wborn
Copy link
Copy Markdown
Member

wborn commented Mar 20, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 4153703cbc

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +192 to +194
if (buf.readableBytes() < dataLength + 4) {
throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength +
" plus CRC, but only " + buf.readableBytes() + " bytes available");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Wait for the rest of a TCP AVL frame before failing

parseData() currently throws whenever the full dataLength + CRC payload is not already buffered. On TCP that is a normal condition—Netty may deliver a Teltonika frame across multiple reads—so a valid tracker packet that arrives fragmented will be rejected and the socket closed instead of being reassembled. I checked the new TeltonikaProtocolDecoder path, and because decodeTcp() has already consumed the preamble before calling this method, there is no later chance to recover that frame.

Useful? React with 👍 / 👎.

Comment on lines +84 to +88
if (buf.readableBytes() < 2) {
return; // Need more data
}

// Check for ping message
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle 0xFF pings before the short-buffer return

The one-byte Teltonika ping can never reach the ping branch below, because the method returns first when readableBytes() < 2. That leaves the 0xFF byte stuck in Netty's cumulation; when the device sends its next IMEI or AVL frame on the same TCP connection, decoding resumes from that stale byte and the next message is parsed with an invalid length/preamble.

Useful? React with 👍 / 👎.

Comment on lines +65 to +69
boolean isNew = newAttributes.stream().anyMatch(a -> Objects.equals(a.getName(), attribute.getName()));
if (isNew || attribute.getValue().isEmpty()) {
continue;
}
assetProcessingService.sendAttributeEvent(new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get()));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Emit events using the normalized attribute names

This sends AttributeEvents with the original names from message.getAttributes(), even when TeltonikaAssetMapper.preProcessAttributes() has already rewritten them before persistence (for example gpsLocation becomes the canonical Asset.LOCATION). AssetProcessingService resolves events by attribute name, so those rewritten fields are rejected as ATTRIBUTE_NOT_FOUND; the asset row is updated, but live consumers/rules never receive the location update through the normal event pipeline.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fleet Management Architecture Rewrite

4 participants