Fleet Telematics Integration Architecture rewrite, Teltonika Integration rewrite#2571
Fleet Telematics Integration Architecture rewrite, Teltonika Integration rewrite#2571pankalog wants to merge 26 commits into
Conversation
…etc. Needs further work in the attribute generation and parsing. More AVL IDs should also be added from other trackers, and also add the parsing logic from the old version if the attribute is unknown.
…I'm almost ready for review
…I'm almost ready for review
# Conflicts: # manager/src/map/mapdata.mbtiles # ui/component/or-vaadin-components/src/or-vaadin-number-field.ts # ui/component/or-vaadin-components/src/or-vaadin-password-field.ts # ui/component/or-vaadin-components/src/or-vaadin-text-area.ts # ui/component/or-vaadin-components/src/or-vaadin-text-field.ts # ui/component/or-vaadin-components/stories/or-vaadin-text-field.stories.ts
There was a problem hiding this comment.
Pull request overview
Ground-up rewrite of the telematics integration to introduce a unified, vendor-driven ingestion/processing pipeline (MQTT/TCP/UDP all produce DeviceMessage and flow through TelematicsService), with a full Teltonika implementation (registry/descriptor parsing, asset mapping, agent + Netty decoder) and accompanying tests.
Changes:
- Introduces core telematics model abstractions (
DeviceMessage, envelope/handler/publisher, protocol + parameter parsing APIs) and centralizes processing inTelematicsService(queue-backed). - Adds Teltonika vendor implementation across model/manager/agent layers (parameter registry + resolver, tracker asset, asset mapper, MQTT handler, TCP/UDP servers + protocol decoder).
- Adds extensive integration/decoder/MQTT command tests and updates build/service registration wiring.
Reviewed changes
Copilot reviewed 45 out of 46 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| ui/app/swagger/build.gradle | Adjusts swagger UI resource generation task duplicate handling. |
| test/src/test/groovy/org/openremote/test/protocol/mqtt/TeltonikaMQTTClientProtocolTest.groovy | Adds large end-to-end and decoder tests for Teltonika MQTT/TCP/UDP flows. |
| test/src/test/groovy/org/openremote/manager/mqtt/TeltonikaMQTTHandlerCommandTest.groovy | Adds unit + end-to-end tests for Teltonika MQTT command and ingest processing. |
| model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaTrackerAsset.java | Defines Teltonika tracker asset and attribute descriptors. |
| model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaRegistry.java | Implements Teltonika parameter registry + dynamic parameter creation + descriptor matching. |
| model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaParameter.java | Defines Teltonika parseable parameter descriptor/builder with AVL metadata. |
| model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaAttributeResolver.java | Resolves Teltonika parameters into typed OpenRemote attributes for JSON/binary. |
| model/src/main/java/org/openremote/model/telematics/teltonika/TeltonikaAssetMapper.java | Creates/provisions Teltonika assets and normalizes well-known attributes. |
| model/src/main/java/org/openremote/model/telematics/session/DeviceSessionManager.java | Adds device session manager abstraction. |
| model/src/main/java/org/openremote/model/telematics/session/DeviceSession.java | Adds device session abstraction/state model. |
| model/src/main/java/org/openremote/model/telematics/session/DeviceConnection.java | Adds runtime device connection state tracking. |
| model/src/main/java/org/openremote/model/telematics/protocol/ProtocolEncodeException.java | Adds protocol encode exception type. |
| model/src/main/java/org/openremote/model/telematics/protocol/ProtocolDecodeException.java | Adds protocol decode exception type. |
| model/src/main/java/org/openremote/model/telematics/protocol/MessageContext.java | Adds mutable per-message/per-connection context (transport, deviceId, etc.). |
| model/src/main/java/org/openremote/model/telematics/protocol/DeviceProtocol.java | Defines vendor protocol API for decode/encode/acks/identification. |
| model/src/main/java/org/openremote/model/telematics/protocol/DeviceCommand.java | Adds a simple device command wrapper. |
| model/src/main/java/org/openremote/model/telematics/parameter/TelematicsParameterRegistry.java | Defines vendor parameter registry API (known + dynamic). |
| model/src/main/java/org/openremote/model/telematics/parameter/StandardParsers.java | Adds standard binary parsers/inference helpers. |
| model/src/main/java/org/openremote/model/telematics/parameter/ParseableValueDescriptor.java | Extends ValueDescriptor with parsing capability + helpers. |
| model/src/main/java/org/openremote/model/telematics/parameter/ParameterRegistry.java | Adds an additional registry interface (generic descriptor lookup). |
| model/src/main/java/org/openremote/model/telematics/parameter/ParameterParser.java | Adds parser strategy interface for byte buffer decoding. |
| model/src/main/java/org/openremote/model/telematics/parameter/ParameterParseException.java | Adds parse exception type for parameter parsing failures. |
| model/src/main/java/org/openremote/model/telematics/parameter/DynamicParameter.java | Adds dynamic (unknown-ID) parameter descriptor generation. |
| model/src/main/java/org/openremote/model/telematics/core/TelematicsMessagePublisher.java | Adds publisher interface for submitting decoded messages to the pipeline. |
| model/src/main/java/org/openremote/model/telematics/core/TelematicsMessageHandler.java | Adds handler interface for vendor-specific asset update processing. |
| model/src/main/java/org/openremote/model/telematics/core/TelematicsMessageEnvelope.java | Adds immutable queue envelope for message processing. |
| model/src/main/java/org/openremote/model/telematics/core/DeviceMessage.java | Adds normalized message model (deviceId + protocol + attribute list). |
| model/src/main/java/org/openremote/model/telematics/asset/TelematicsAssetMapper.java | Adds vendor asset mapper API for provisioning and attribute application. |
| model/src/main/java/org/openremote/model/geo/GeoJSONPoint.java | Adds equals/hashCode override for GeoJSON point equality. |
| model/src/main/java/org/openremote/model/geo/GeoJSON.java | Adds equals/hashCode base implementation. |
| model/src/main/java/org/openremote/model/StandardModelProvider.java | Registers TeltonikaParameters as a model descriptor provider. |
| model/build.gradle | Adds Netty buffer dependency to model module. |
| manager/src/main/resources/META-INF/services/org.openremote.model.ContainerService | Registers TelematicsService as a container service. |
| manager/src/main/resources/META-INF/services/org.openremote.manager.mqtt.MQTTHandler | Registers TeltonikaMQTTHandler as an MQTT handler. |
| manager/src/main/java/org/openremote/manager/telematics/TeltonikaVendor.java | Implements Teltonika vendor, protocols, command mapping, and session manager. |
| manager/src/main/java/org/openremote/manager/telematics/TeltonikaMessageHandler.java | Implements vendor message handler that provisions/updates assets and emits events. |
| manager/src/main/java/org/openremote/manager/telematics/TeltonikaMQTTHandler.java | Implements MQTT ingress handler for Teltonika device topics and command publishing. |
| manager/src/main/java/org/openremote/manager/telematics/TelematicsVendor.java | Adds vendor interface describing protocols/registries/mappers/handlers. |
| manager/src/main/java/org/openremote/manager/telematics/TelematicsService.java | Adds central telematics queue-backed processing + connection tracking. |
| agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaUdpServer.java | Adds UDP server wrapper wiring Teltonika decoder. |
| agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaTcpServer.java | Adds TCP server wrapper wiring Teltonika decoder. |
| agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaRecord.java | Adds record model for decoded Teltonika AVL frames. |
| agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaProtocolDecoder.java | Adds Netty decoder for Teltonika AVL codecs (TCP/UDP). |
| agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaProtocol.java | Updates agent protocol to publish decoded messages via TelematicsMessagePublisher. |
| agent/src/main/java/org/openremote/agent/protocol/teltonika/TeltonikaAgent.java | Adds agent asset for running Teltonika TCP/UDP servers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| List<Attribute<?>> newAttributes = assetMapper.applyAttributes(asset, message); | ||
| assetStorageService.merge(asset); | ||
|
|
||
| for (Attribute<?> attribute : message.getAttributes()) { | ||
| boolean isNew = newAttributes.stream().anyMatch(a -> Objects.equals(a.getName(), attribute.getName())); | ||
| if (isNew || attribute.getValue().isEmpty()) { | ||
| continue; | ||
| } | ||
| assetProcessingService.sendAttributeEvent(new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get())); |
There was a problem hiding this comment.
Attribute events are emitted using message.getAttributes(), but applyAttributes() preprocesses/renames attributes (e.g. gpsLocation -> location) without mutating the DeviceMessage. This can cause events to be published under names that were not actually applied to the asset. Emit events from the pre-processed/applied attribute list instead (and use the same names stored on the asset).
| List<Attribute<?>> newAttributes = assetMapper.applyAttributes(asset, message); | |
| assetStorageService.merge(asset); | |
| for (Attribute<?> attribute : message.getAttributes()) { | |
| boolean isNew = newAttributes.stream().anyMatch(a -> Objects.equals(a.getName(), attribute.getName())); | |
| if (isNew || attribute.getValue().isEmpty()) { | |
| continue; | |
| } | |
| assetProcessingService.sendAttributeEvent(new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get())); | |
| List<Attribute<?>> appliedAttributes = assetMapper.applyAttributes(asset, message); | |
| assetStorageService.merge(asset); | |
| for (Attribute<?> attribute : appliedAttributes) { | |
| if (attribute.getValue().isEmpty()) { | |
| continue; | |
| } | |
| assetProcessingService.sendAttributeEvent( | |
| new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get()) | |
| ); |
| public void incrementMessageCount() { | ||
| this.messageCount++; | ||
| } |
There was a problem hiding this comment.
messageCount++ is not atomic; with concurrentConsumers=4 on the telematics queue, multiple threads can update the same DeviceConnection concurrently and lose increments. Use an AtomicLong (or LongAdder) or synchronize updates so message counts remain accurate.
| public void incrementConnectionCount() { | ||
| this.connectionCount++; | ||
| } |
There was a problem hiding this comment.
connectionCount++ is not atomic; if connection state updates happen concurrently for the same device, increments can be lost. Use an atomic counter (or synchronize) to keep connection counts reliable.
| } | ||
|
|
||
| tasks.register('generateSources', Copy) { | ||
| duplicatesStrategy = 'include' |
There was a problem hiding this comment.
In this repo other Copy/Sync tasks set duplicatesStrategy using the DuplicatesStrategy enum (e.g. DuplicatesStrategy.EXCLUDE). Setting it to the string 'include' is inconsistent and may not be coerced correctly by Gradle; use DuplicatesStrategy.INCLUDE (and add the import if needed).
| cleanup: | ||
| channel.finish() | ||
| container.stop() | ||
| } |
There was a problem hiding this comment.
The cleanup block calls container.stop(), but this test method doesn't define a container variable (only decoder/channel/buffer). This will throw at runtime during cleanup; remove that call or create/stop a container in this test.
| if (buf.readableBytes() < 2) { | ||
| return; // Need more data | ||
| } | ||
|
|
||
| // Check for ping message | ||
| if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == 0xFF) { | ||
| buf.readByte(); | ||
| return; // Ignore ping | ||
| } | ||
|
|
There was a problem hiding this comment.
decodeTcp returns early when readableBytes() < 2, which makes the subsequent ping handling for a single 0xFF byte unreachable. Handle the 1-byte ping case before the < 2 guard (or adjust the guard) so the ping message can be consumed as intended.
| if (buf.readableBytes() < 2) { | |
| return; // Need more data | |
| } | |
| // Check for ping message | |
| if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == 0xFF) { | |
| buf.readByte(); | |
| return; // Ignore ping | |
| } | |
| // Check for ping message (single 0xFF byte) | |
| if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == (byte) 0xFF) { | |
| buf.readByte(); | |
| return; // Ignore ping | |
| } | |
| if (buf.readableBytes() < 2) { | |
| return; // Need more data | |
| } |
| dataLength = buf.readUnsignedInt(); | ||
| dataStartIndex = buf.readerIndex(); | ||
|
|
||
| if (buf.readableBytes() < dataLength + 4) { | ||
| throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength + | ||
| " plus CRC, but only " + buf.readableBytes() + " bytes available"); |
There was a problem hiding this comment.
For TCP, ByteToMessageDecoder can be called with partial frames; throwing when buf.readableBytes() < dataLength + 4 will break decoding on normal TCP fragmentation. Instead, reset the reader index (or avoid advancing it) and return until the full frame (payload + CRC) is available.
| dataLength = buf.readUnsignedInt(); | |
| dataStartIndex = buf.readerIndex(); | |
| if (buf.readableBytes() < dataLength + 4) { | |
| throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength + | |
| " plus CRC, but only " + buf.readableBytes() + " bytes available"); | |
| // Mark the current reader index so we can roll back if the full frame is not yet available | |
| buf.markReaderIndex(); | |
| dataLength = buf.readUnsignedInt(); | |
| // Start of actual data (after the 4-byte length field) | |
| dataStartIndex = buf.readerIndex(); | |
| // Ensure the full frame (data + 4-byte CRC) is available before proceeding | |
| if (buf.readableBytes() < dataLength + 4) { | |
| // Not enough data yet; reset to the marked position and wait for more bytes | |
| buf.resetReaderIndex(); | |
| return null; |
|
As it should not be merged to |
There was a problem hiding this comment.
Well, this was a very lengthy and exhausting PR to take a look at 😰
Please don't do this to us again, thanks 😅
I want to note that this definitely needs more eyes to get proper feedback on this.
I'm in no way familiar with the meaning behind this, as there is no linked issue that explains
why all this code needs to end up in the main repository. Even though it might become an extension,
it is still a lot of code and a lot of boilerplate.
I would think a lot of this code can be simplified.
It feels very complex for something that should be easy and accessible for other developers.
Don't know what you think about that @pankalog ?
Code-wise only have a few minor remarks on code quality / practices.
Please take a look at the Copilot comments as well.
| @@ -0,0 +1,1044 @@ | |||
| /* | |||
| * Copyright 2019, OpenRemote Inc. | |||
There was a problem hiding this comment.
Please update the copyright headers to 2026
| @TsIgnore | ||
| @ModelDescriptor(assetType = Asset.class, provider = MetaItemType.class) | ||
| @ModelDescriptor(assetType = Asset.class, provider = ValueType.class) | ||
| @ModelDescriptor(assetType = Asset.class, provider = TeltonikaParameters.class) |
| import java.util.*; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.logging.Logger; | ||
| import java.util.stream.Stream; |
There was a problem hiding this comment.
Either use a wildcard or specific classes.
Do you use some kind of static analysis tools to prevent these issues?
| @@ -0,0 +1,243 @@ | |||
| package org.openremote.model.telematics.teltonika; | |||
| return new Attribute<>(name, ValueType.TEXT, value != null ? value.toString() : null, timestamp); | ||
| } | ||
|
|
||
| private ByteBuf convertJsonValueToBinary(Object value, ParseableValueDescriptor<?> valueDescriptor) { |
There was a problem hiding this comment.
This sounds like a utility function we might already have.
And if not, we should about moving this outside of the telematics model.
| return new Attribute<>(name, ValueType.TEXT, hexValue, timestamp); | ||
| } | ||
|
|
||
| private Attribute<?> createRawJsonAttribute(String parameterId, Object rawValue, long timestamp) { |
There was a problem hiding this comment.
This sounds like a utility function we might already have.
And if not, we should about moving this outside of the telematics model.
| return VENDOR_PREFIX + normalizedId; | ||
| } | ||
|
|
||
| private Attribute<?> createRawAttribute(String parameterId, ByteBuf value, int length, long timestamp) { |
There was a problem hiding this comment.
Does it make sense to move this into a utility function, instead of having it in the telematics-specific logic?
| return new Attribute<>(name, ValueType.TEXT, normalized != null ? normalized.toString() : null, timestamp); | ||
| } | ||
|
|
||
| private Attribute<?> createFallbackTypedAttribute(String name, Object value, long timestamp) { |
There was a problem hiding this comment.
This looks like a function we'd already have, or can be simplified,
instead of parsing all the object types manually like this.
|
|
||
|
|
||
|
|
There was a problem hiding this comment.
A lot of unnecessary whitespace
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 44 out of 45 changed files in this pull request and generated 15 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| if (buf.readableBytes() < 2) { | ||
| return; // Need more data | ||
| } | ||
|
|
||
| // Check for ping message | ||
| if (buf.readableBytes() == 1 && buf.getByte(buf.readerIndex()) == 0xFF) { | ||
| buf.readByte(); | ||
| return; // Ignore ping | ||
| } | ||
|
|
| throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength + | ||
| " plus CRC, but only " + buf.readableBytes() + " bytes available"); |
| return canPublish(connection, securityContext, topic); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean canPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) { |
| public void sendCommand(String realm, String clientId, String imei, DeviceCommand command) { | ||
| String topic = getCommandTopic(realm, clientId, imei); | ||
| MessageContext context = new MessageContext(MessageContext.Transport.MQTT) | ||
| .setRealm(realm) | ||
| .setDeviceId(imei) | ||
| .setCodecName(TeltonikaVendor.CODEC_JSON); | ||
|
|
||
| try { | ||
| ByteBuf encoded = mqttProtocol.encodeCommand(command, context) | ||
| .orElseThrow(() -> new ProtocolEncodeException(mqttProtocol.getProtocolId(), command.getCommand(), "Protocol returned empty payload")); | ||
| try { | ||
| byte[] bytes = new byte[encoded.readableBytes()]; | ||
| encoded.getBytes(encoded.readerIndex(), bytes); | ||
| Object jsonPayload = ValueUtil.JSON.readTree(bytes); | ||
| publishMessage(topic, jsonPayload, MqttQoS.AT_LEAST_ONCE); | ||
| } finally { | ||
| encoded.release(); | ||
| } | ||
| } catch (ProtocolEncodeException | IOException e) { | ||
| getLogger().warning("Failed to send Teltonika command to " + imei + ": " + e.getMessage()); | ||
| } | ||
| } |
| public void stop(Container container) { | ||
| LOG.info("Stopping TelematicsService: vendors=" + vendorMap.size() + ", devices=" + deviceConnectionMap.size()); | ||
| deviceConnectionMap.clear(); | ||
| vendorMap.clear(); |
| public static final AttributeDescriptor<Integer> PRIORITY = new AttributeDescriptor<>("priority", TeltonikaParameters.PRIORITY).withOptional(true); | ||
| public static final AttributeDescriptor<Integer> ALTITUDE = new AttributeDescriptor<>("altitude", TeltonikaParameters.ALTITUDE).withOptional(true); | ||
| public static final AttributeDescriptor<Integer> DIRECTION = new AttributeDescriptor<>("direction", TeltonikaParameters.DIRECTION).withOptional(true); | ||
| public static final AttributeDescriptor<Integer> SATELLITES = new AttributeDescriptor<>("satellites", TeltonikaParameters.SATELLITES).withOptional(true); | ||
| public static final AttributeDescriptor<Integer> SPEED_SATELLITE = new AttributeDescriptor<>("speedSatellite", TeltonikaParameters.SPEED_SATELLITE).withOptional(true); | ||
| public static final AttributeDescriptor<Integer> EVENT_TRIGGERED = new AttributeDescriptor<>("eventTriggered", TeltonikaParameters.EVENT_TRIGGERED).withOptional(true); | ||
| public static final AttributeDescriptor<Long> TIMESTAMP = new AttributeDescriptor<>("timestamp", ValueType.TIMESTAMP).withOptional(true); | ||
| public static final AttributeDescriptor<GeoJSONPoint> GPS_LOCATION = new AttributeDescriptor<>("gpsLocation", TeltonikaParameters.LOCATION).withOptional(true); | ||
| public static final AttributeDescriptor<Integer> SPEED = new AttributeDescriptor<>("speed", TeltonikaParameters.SPEED).withOptional(true); |
| public interface ParameterRegistry<D extends ParseableValueDescriptor<?>> { | ||
|
|
||
| /** | ||
| * Get the vendor prefix used for attribute naming (e.g., "teltonika", "queclink"). | ||
| * | ||
| * @return The vendor prefix string | ||
| */ | ||
| String getVendorPrefix(); | ||
|
|
||
| /** | ||
| * Get a descriptor by its protocol-specific ID (e.g., AVL ID for Teltonika). | ||
| * | ||
| * @param id The protocol-specific parameter ID | ||
| * @return The descriptor if found, or empty if not registered | ||
| */ | ||
| Optional<D> getById(String id); | ||
|
|
||
| /** | ||
| * Get a descriptor by its full name (including vendor prefix). | ||
| * | ||
| * @param name The full attribute name (e.g., "teltonika_239") | ||
| * @return The descriptor if found, or empty if not registered | ||
| */ | ||
| Optional<D> getByName(String name); | ||
|
|
||
| /** |
| for (Attribute<?> attribute : message.getAttributes()) { | ||
| if (TeltonikaTrackerAsset.GPS_LOCATION.getName().equals(attribute.getName())) { | ||
| attribute = new Attribute<>(Asset.LOCATION, (GeoJSONPoint) attribute.getValue().get(), attribute.getTimestamp().get()); | ||
| } | ||
| if(TeltonikaParameters.TIMESTAMP.getName().equals(attribute.getName())) { | ||
| attribute = new Attribute<>(TeltonikaTrackerAsset.TIMESTAMP, (Long) attribute.getValue().get(), attribute.getTimestamp().get()); | ||
| } | ||
|
|
| default boolean isTimedOut(long timeoutSeconds) { | ||
| return getLastSeen().plusSeconds(timeoutSeconds).isBefore(Instant.now()); | ||
| } |
| if (topic.getTokens().length < 5) { | ||
| getLogger().warning(MessageFormat.format("Topic {0} is not a valid Topic. Please use a valid Topic.", topic)); | ||
| return false; | ||
| } | ||
| try { | ||
| Long.parseLong(topic.getTokens()[3]); | ||
| } catch (NumberFormatException e) { | ||
| getLogger().warning(MessageFormat.format("IMEI {0} is not a valid IMEI value. Please use a valid IMEI value.", topic.getTokens()[3])); | ||
| return false; | ||
| } | ||
| return Objects.equals(topic.getTokens()[2], TELTONIKA_DEVICE_TOKEN) | ||
| && (Objects.equals(topic.getTokens()[4], TELTONIKA_DEVICE_RECEIVE_TOPIC) | ||
| || Objects.equals(topic.getTokens()[4], TELTONIKA_DEVICE_SEND_TOPIC)); |
| @@ -0,0 +1,46 @@ | |||
| package org.openremote.agent.protocol.teltonika; | |||
There was a problem hiding this comment.
Please add all the missing copyright headers to new files.
| * Each record contains: | ||
| * - IMEI (device identifier) | ||
| * - Timestamp | ||
| * - GPS location | ||
| * - Validity flag | ||
| * - IO elements (parsed as OpenRemote attributes) |
| package org.openremote.agent.protocol.teltonika; | ||
|
|
||
| import org.openremote.model.attribute.AttributeMap; | ||
| import org.openremote.model.geo.GeoJSONPoint; |
There was a problem hiding this comment.
Unused:
| import org.openremote.model.geo.GeoJSONPoint; |
| public List<DeviceConnection> getTrackerStates() { | ||
| return List.copyOf(deviceConnectionMap.values()); | ||
| } | ||
|
|
||
| public List<DeviceConnection> getTrackerStatesByVendor(String vendorId) { | ||
| return deviceConnectionMap.entrySet().stream() | ||
| .filter(entry -> entry.getKey().startsWith(vendorId + ":")) | ||
| .map(Map.Entry::getValue) | ||
| .toList(); | ||
| } | ||
|
|
||
| public Optional<DeviceConnection> removeTrackerState(String vendorId, String deviceId) { | ||
| return Optional.ofNullable(deviceConnectionMap.remove(toDeviceKey(vendorId, deviceId))); | ||
| } | ||
|
|
||
| public boolean isDeviceConnected(String vendorId, String deviceId) { | ||
| return getTrackerState(vendorId, deviceId) | ||
| .map(DeviceConnection::isConnected) | ||
| .orElse(false); | ||
| } | ||
|
|
||
| public int getConnectedDeviceCount() { | ||
| int count = 0; | ||
| for (DeviceConnection connection : deviceConnectionMap.values()) { | ||
| if (connection.isConnected()) { | ||
| count++; | ||
| } | ||
| } | ||
| return count; | ||
| } | ||
|
|
|
|
||
| Optional<C> fromInboundResponse(M message); | ||
| } | ||
| } |
| /** | ||
| * Called when a message is received from the device. | ||
| * Updates last seen time. | ||
| * | ||
| * @param message The decoded message (list of Attributes) | ||
| */ | ||
| void onMessage(DeviceMessage message); | ||
|
|
||
| /** | ||
| * Called when the device connects. | ||
| */ | ||
| void onConnect(); | ||
|
|
||
| /** | ||
| * Called when the device disconnects. | ||
| */ | ||
| void onDisconnect(); | ||
|
|
||
| /** | ||
| * Queue a command to send to the device. | ||
| * | ||
| * @param command The command to send | ||
| * @return true if the command was queued successfully | ||
| */ | ||
| boolean queueCommand(DeviceCommand command); |
There was a problem hiding this comment.
Why are these methods never called? As a result lastSeen won't update correctly and it will not send commands to devices.
| * @param timeoutSeconds Sessions idle longer than this are removed | ||
| * @return The number of sessions removed | ||
| */ | ||
| int cleanupTimedOut(long timeoutSeconds); |
There was a problem hiding this comment.
This method is never called. Will this result in sessions that timed out never to be cleaned up causing a memory leak?
| /** | ||
| * Called when a session receives a message. | ||
| */ | ||
| void onSessionMessage(DeviceSession session); |
|
|
||
| return dynamicParameters.computeIfAbsent(id, k -> { | ||
| LOG.fine("Dynamic parameter: " + id + " (length=" + byteLength + ")"); | ||
| return (DynamicParameter<?>) DynamicParameter.fromByteLength(getVendorPrefix(), id, byteLength); |
There was a problem hiding this comment.
Redundant cast:
| return (DynamicParameter<?>) DynamicParameter.fromByteLength(getVendorPrefix(), id, byteLength); | |
| return DynamicParameter.fromByteLength(getVendorPrefix(), id, byteLength); |
| @@ -0,0 +1,255 @@ | |||
| package org.openremote.manager.mqtt | |||
There was a problem hiding this comment.
Why is this test not grouped with the other tests in the org.openremote.test package?
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4153703cbc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (buf.readableBytes() < dataLength + 4) { | ||
| throw new IllegalStateException("Incomplete Teltonika TCP frame: expected dataLength=" + dataLength + | ||
| " plus CRC, but only " + buf.readableBytes() + " bytes available"); |
There was a problem hiding this comment.
Wait for the rest of a TCP AVL frame before failing
parseData() currently throws whenever the full dataLength + CRC payload is not already buffered. On TCP that is a normal condition—Netty may deliver a Teltonika frame across multiple reads—so a valid tracker packet that arrives fragmented will be rejected and the socket closed instead of being reassembled. I checked the new TeltonikaProtocolDecoder path, and because decodeTcp() has already consumed the preamble before calling this method, there is no later chance to recover that frame.
Useful? React with 👍 / 👎.
| if (buf.readableBytes() < 2) { | ||
| return; // Need more data | ||
| } | ||
|
|
||
| // Check for ping message |
There was a problem hiding this comment.
Handle 0xFF pings before the short-buffer return
The one-byte Teltonika ping can never reach the ping branch below, because the method returns first when readableBytes() < 2. That leaves the 0xFF byte stuck in Netty's cumulation; when the device sends its next IMEI or AVL frame on the same TCP connection, decoding resumes from that stale byte and the next message is parsed with an invalid length/preamble.
Useful? React with 👍 / 👎.
| boolean isNew = newAttributes.stream().anyMatch(a -> Objects.equals(a.getName(), attribute.getName())); | ||
| if (isNew || attribute.getValue().isEmpty()) { | ||
| continue; | ||
| } | ||
| assetProcessingService.sendAttributeEvent(new AttributeEvent(asset.getId(), attribute.getName(), attribute.getValue().get())); |
There was a problem hiding this comment.
Emit events using the normalized attribute names
This sends AttributeEvents with the original names from message.getAttributes(), even when TeltonikaAssetMapper.preProcessAttributes() has already rewritten them before persistence (for example gpsLocation becomes the canonical Asset.LOCATION). AssetProcessingService resolves events by attribute name, so those rewritten fields are rejected as ATTRIBUTE_NOT_FOUND; the asset row is updated, but live consumers/rules never receive the location update through the normal event pipeline.
Useful? React with 👍 / 👎.



Important
This PR will not be merged into the main repository, but will be eventually converted into an extension. the PR is here as it allows for testing, test deployment, and easy review by reviewers, and since all the code is backend-based, it can be trivially converted into an extension.
Tip
You can find various diagrams and textual descriptions at the end of the PR that may help you understand the overall function of the code.
Description
This PR contains the code for the ground-up rewrite of the Telematics integration of OpenRemote.
After years of using the current version of the Teltonika integration in openremote/fleet-management, a few problems were identified:
Because of these issues, a decision was made to rewrite and gradually deploy a new version of the Telematics integration to make it adhere to proper system design principles.
With a few short words: There are various ingress routes for tracker data to flow inwards, but there's only one way to actually push that data to OpenRemote. To parse messages from devices, we extend the ValueDescriptor to include a parser, which takes in a ByteBuffer and a length, and allows the developer to define exactly how the parameter is parsed.
Commentary and Supporting Diagrams
The important architectural decision is that MQTT, TCP, and UDP are no longer "special systems" with separate asset update logic; they are just different ways of obtaining a DeviceMessage. Once a DeviceMessage exists, the message is handed to TelematicsService and processed through the same queue-backed flow as other telematics input. In practice this means transport classes decode and submit, while create/update/apply behavior is centralized in the message handler/mapping layer.
At model level, the central representation is DeviceMessage (device identity + typed attribute list), and Teltonika semantics are captured by TeltonikaParameter + TeltonikaRegistry + TeltonikaAttributeResolver. This keeps protocol parsing details local to Teltonika metadata but avoids leaking transport details into asset processing. It also preserves strict parse visibility: malformed data still fails at decode/resolve boundaries with explicit exceptions, rather than being silently swallowed or transformed into ambiguous states.
A key part of the PR is mapping consistency between protocol-level fields and OpenRemote-level asset attributes. Known Teltonika parameters are resolved first through TeltonikaTrackerAsset descriptors; this ensures stable, platform-facing attribute names for mapped parameters. For known-but-unmapped parameters, fallback naming is derived from parameter metadata (display-name normalization) rather than opaque numeric IDs; for unknown parameters, deterministic teltonika_ fallback is preserved. This gives us backwards-safe behavior while improving readability and discoverability of newly surfaced parameters.
The mapper layer (TeltonikaAssetMapper) now explicitly handles normalization of well-known OpenRemote attributes (notably location), so platform features like map views work off canonical attributes instead of vendor-specific aliases. Preprocessing now builds a new attribute list from the incoming message (rather than mutating in place), which makes behavior easier to review and reason about: each incoming attribute is either rewritten or passed through, and final normalized output is explicit.
Agent-side changes align with the same direction: the Teltonika agent/protocol stack can ingest TCP/UDP (including dual mode), decode records, and submit to TelematicsService via a publisher interface instead of applying asset updates directly. That avoids a second business-logic path in the agent module and keeps one source of truth for connection state updates, queueing, asset provisioning, and attribute event emission. In other words, the agent is now ingress infrastructure, not a parallel business processor.
Overall, this PR is less about adding one feature and more about reducing architectural drift: one message model, one processing pipeline, one mapping policy, and transport-specific code constrained to decode/ack responsibilities. The net effect should be easier evolution (new codecs/params), less behavioral divergence between transports, and clearer observability when parsing or mapping fails.
Telematics Class Diagram
classDiagram direction LR namespace model_telematics_core { class DeviceMessage { -String deviceId -String protocolName -List~Attribute~ attributes +getDeviceId() String +getProtocolName() String +getAttributes() List~Attribute~ } class TelematicsMessageEnvelope { -String vendorId -String deviceId -String realm -String protocolId -Transport transport -Instant receivedAt -DeviceMessage message } class TelematicsMessageHandler { <<interface>> +getVendorId() String +process(envelope, connection) Optional~String~ } class TelematicsMessagePublisher { <<interface>> +submitMessage(vendorId, realm, transport, codecId, message) void } } namespace model_telematics_protocol { class DeviceProtocol { <<interface>> +getProtocolId() String +canHandle(data, context) boolean +decode(data, context) List~DeviceMessage~ +encodeCommand(command, context) Optional~ByteBuf~ } class DeviceCommand class MessageContext class ProtocolDecodeException class ProtocolEncodeException } namespace manager_telematics { class TelematicsVendor { <<interface>> +getVendorId() String +getProtocol() DeviceProtocol +getParameterRegistry() TelematicsParameterRegistry +getAssetMapper() TelematicsAssetMapper +createMessageHandler(...) TelematicsMessageHandler } class TelematicsService { +TELEMATICS_MESSAGE_QUEUE String +registerVendor(vendor) void +markTrackerConnected(...) DeviceConnection +submitMessage(vendorId, realm, transport, codecId, message) void -processEnvelope(envelope) void } class TeltonikaVendor class TeltonikaMessageHandler class TeltonikaMqttJsonProtocol class TeltonikaTransportProtocol class TeltonikaCommandMapper class TeltonikaSessionManager class TeltonikaMQTTHandler } namespace model_telematics_teltonika { class TeltonikaParameter~T~ { +getId() String +getDisplayName() Optional~String~ +builder() Builder +builder(valueDescriptor, parser) Builder } class TeltonikaParameters { <<catalog>> +TIMESTAMP TeltonikaParameter +EYE_MOVEMENT_COUNT_1 TeltonikaParameter +... many more ... } class TeltonikaRegistry { <<singleton>> +getById(id) Optional~TeltonikaParameter~ +getByFullName(name) Optional~TeltonikaParameter~ +findMatchingAttributeDescriptor(assetClass, valueDescriptor) Optional~AttributeDescriptor~ } class TeltonikaAttributeResolver { +resolveBinaryIo(parameterId, value, timestamp) Attribute +resolveJson(parameterId, rawValue, timestamp) Attribute } class TeltonikaAssetMapper { +createAsset(imei, realm) TeltonikaTrackerAsset +generateAssetId(deviceId) String +applyAttributes(asset, message) List~Attribute~ +preProcessAttributes(asset, message) List~Attribute~ } class TeltonikaTrackerAsset } namespace agent_teltonika { class TeltonikaAgent class TeltonikaProtocol class TeltonikaTcpServer class TeltonikaUdpServer class TeltonikaProtocolDecoder class TeltonikaRecord } class AssetStorageService class AssetProcessingService class DeviceConnection class DeviceSessionManager class TelematicsParameterRegistry~T~ class TelematicsAssetMapper~A~ class Attribute class AttributeDescriptor class ValueDescriptor class Agent~T,U,V~ class DefaultAgentLink class AbstractProtocol~A,L~ class AbstractTCPServer~M~ class AbstractUDPServer~M~ class ByteToMessageDecoder class ByteBuf class ContainerService TelematicsService ..|> ContainerService TelematicsService ..|> TelematicsMessagePublisher TelematicsService --> TelematicsVendor TelematicsService --> TelematicsMessageHandler TelematicsService --> TelematicsMessageEnvelope TelematicsService --> DeviceConnection TelematicsService --> AssetStorageService TelematicsService --> AssetProcessingService TelematicsVendor --> DeviceProtocol TelematicsVendor --> TelematicsParameterRegistry~T~ TelematicsVendor --> TelematicsAssetMapper~A~ TelematicsVendor --> DeviceSessionManager TelematicsVendor --> TelematicsMessageHandler TeltonikaVendor ..|> TelematicsVendor TeltonikaVendor --> TeltonikaRegistry TeltonikaVendor --> TeltonikaAssetMapper TeltonikaVendor --> TeltonikaCommandMapper TeltonikaVendor --> TeltonikaMqttJsonProtocol TeltonikaVendor --> TeltonikaTransportProtocol TeltonikaVendor --> TeltonikaSessionManager TeltonikaVendor --> TeltonikaMessageHandler TeltonikaMqttJsonProtocol ..|> DeviceProtocol TeltonikaTransportProtocol ..|> DeviceProtocol TeltonikaMqttJsonProtocol --> TeltonikaAttributeResolver TeltonikaMqttJsonProtocol --> DeviceMessage TeltonikaMqttJsonProtocol --> MessageContext TeltonikaMqttJsonProtocol --> ByteBuf TeltonikaMqttJsonProtocol --> ProtocolDecodeException TeltonikaMqttJsonProtocol --> ProtocolEncodeException TeltonikaMQTTHandler --> TeltonikaVendor TeltonikaMQTTHandler --> TelematicsService TeltonikaMQTTHandler --> DeviceProtocol TeltonikaMQTTHandler --> MessageContext TeltonikaMessageHandler ..|> TelematicsMessageHandler TeltonikaMessageHandler --> TeltonikaAssetMapper TeltonikaMessageHandler --> TeltonikaTrackerAsset TeltonikaMessageHandler --> DeviceMessage TeltonikaMessageHandler --> DeviceConnection TeltonikaRegistry ..|> TelematicsParameterRegistry~T~ TeltonikaRegistry --> TeltonikaParameters TeltonikaRegistry --> TeltonikaParameter TeltonikaRegistry --> AttributeDescriptor TeltonikaParameter --|> ValueDescriptor TeltonikaParameters --> TeltonikaParameter TeltonikaAttributeResolver --> TeltonikaRegistry TeltonikaAttributeResolver --> TeltonikaParameter TeltonikaAttributeResolver --> TeltonikaTrackerAsset TeltonikaAttributeResolver --> Attribute TeltonikaAssetMapper ..|> TelematicsAssetMapper~A~ TeltonikaAssetMapper --> TeltonikaTrackerAsset TeltonikaAssetMapper --> DeviceMessage TeltonikaAssetMapper --> Attribute TeltonikaAgent --|> Agent~T,U,V~ TeltonikaAgent --> TeltonikaProtocol TeltonikaProtocol --|> AbstractProtocol~A,L~ TeltonikaProtocol --> TeltonikaTcpServer TeltonikaProtocol --> TeltonikaUdpServer TeltonikaProtocol --> TelematicsMessagePublisher TeltonikaProtocol --> TeltonikaRecord TeltonikaProtocol --> DeviceMessage TeltonikaTcpServer --|> AbstractTCPServer~M~ TeltonikaUdpServer --|> AbstractUDPServer~M~ TeltonikaTcpServer --> TeltonikaProtocolDecoder TeltonikaUdpServer --> TeltonikaProtocolDecoder TeltonikaProtocolDecoder --|> ByteToMessageDecoder TeltonikaProtocolDecoder --> TeltonikaRecord TeltonikaProtocolDecoder --> TeltonikaAttributeResolver TeltonikaProtocolDecoder --> TeltonikaRegistry TeltonikaProtocolDecoder --> ByteBufTelematics Sequence Diagrams
End-to-End Ingestion (MQTT, TCP, UDP)
sequenceDiagram autonumber box rgba(232,245,255,0.6) Ingestion Sources participant DeviceMQTT as Teltonika Device (MQTT) participant DeviceTCP as Teltonika Device (TCP) participant DeviceUDP as Teltonika Device (UDP) end box rgba(233,255,239,0.6) Ingress Layer participant MQTTHandler as TeltonikaMQTTHandler participant AgentProtocol as TeltonikaProtocol (Agent) participant Decoder as TeltonikaProtocolDecoder participant DeviceProtocol as DeviceProtocol (Teltonika) end box rgba(255,248,232,0.6) Normalized Messaging participant Publisher as TelematicsMessagePublisher participant Service as TelematicsService participant Queue as SEDA telematics-device-messages participant Envelope as TelematicsMessageEnvelope end box rgba(255,234,242,0.6) Processing Layer participant Handler as TeltonikaMessageHandler participant Mapper as TeltonikaAssetMapper participant Resolver as TeltonikaAttributeResolver participant Registry as TeltonikaRegistry participant Store as AssetStorageService participant Processing as AssetProcessingService end alt MQTT path DeviceMQTT->>MQTTHandler: publish JSON on realm/client/teltonika/{imei}/data MQTTHandler->>DeviceProtocol: decode(ByteBuf, MessageContext) DeviceProtocol->>Resolver: resolveJson(paramId, rawValue, ts) Resolver->>Registry: lookup TeltonikaParameter + matching descriptor Registry-->>Resolver: known/unknown mapping metadata Resolver-->>DeviceProtocol: Attribute list entries DeviceProtocol-->>MQTTHandler: List<DeviceMessage> MQTTHandler->>Service: submitMessage(vendorId, realm, MQTT, codec, message) end alt TCP path DeviceTCP->>AgentProtocol: send identification/AVL packet AgentProtocol->>Decoder: decode TCP frame Decoder->>Resolver: resolveBinaryIo(avlId, bytes, ts) Resolver->>Registry: parameter/descriptor lookup Registry-->>Resolver: mapping info Resolver-->>Decoder: typed attributes Decoder-->>AgentProtocol: TeltonikaRecord(s) AgentProtocol->>AgentProtocol: toDeviceMessage(record) AgentProtocol->>Publisher: submitMessage(vendorId, realm, TCP, codec, message) Publisher->>Service: submitMessage(...) end alt UDP path DeviceUDP->>AgentProtocol: send UDP AVL packet AgentProtocol->>Decoder: decode UDP frame Decoder-->>DeviceUDP: UDP ACK Decoder-->>AgentProtocol: TeltonikaRecord(s) AgentProtocol->>AgentProtocol: toDeviceMessage(record) AgentProtocol->>Publisher: submitMessage(vendorId, realm, UDP, codec, message) Publisher->>Service: submitMessage(...) end Service->>Service: markTrackerConnected / update DeviceConnection Service->>Envelope: build envelope(vendor,device,realm,protocol,transport,message) Service->>Queue: enqueue envelope Queue->>Service: dequeue envelope Service->>Handler: process(envelope, connection) Handler->>Mapper: getOrCreateAsset + applyAttributes Mapper->>Mapper: preProcessAttributes (normalize well-known fields) Handler->>Store: merge asset Handler->>Processing: send attribute events Handler-->>Service: Optional<assetId> Service->>Service: update connection assetIdKey Mapping Resolution Flow
sequenceDiagram autonumber participant Protocol as TeltonikaMqttJsonProtocol / Decoder participant Resolver as TeltonikaAttributeResolver participant Registry as TeltonikaRegistry participant Tracker as TeltonikaTrackerAsset metadata Protocol->>Resolver: resolveJson/resolveBinaryIo(parameterId, raw, ts) Resolver->>Registry: getById/getByFullName(parameterId) alt Known TeltonikaParameter Registry-->>Resolver: descriptor metadata (id, type, parser, displayName) Resolver->>Registry: findMatchingAttributeDescriptor(TeltonikaTrackerAsset, parameter) alt Descriptor exists on tracker asset Registry-->>Resolver: AttributeDescriptor match Resolver-->>Protocol: Attribute(name = tracker descriptor name, typed value) else Descriptor missing Registry-->>Resolver: no match Resolver-->>Protocol: Attribute(name = normalized displayName, typed value) end else Unknown parameter Registry-->>Resolver: empty Resolver-->>Protocol: Attribute(name = teltonika_{id}, inferred type/raw) end