Distributed Tracing with Quarkus, Python, Open Telemetry and Jaeger (Part 3)

Heiko W. Rupp
ITNEXT
Published in
5 min readAug 17, 2022

--

The last two parts (1, 2) of this series focussed on Tracing basics and instrumentation of Quarkus and Python code for http requests. A very popular mechanism for data transfer is Apache Kafka. In this post, we’ll look at how to instrument code for use with Kafka and get trace propagation working. The general scenario is similar to the previous, except that for the Quarkus part, I use one server only:

Setup of this part

Quarkus is configured for tracing in pom.xml and application.properties as described in the first part of the series. The sender code to send data to topic1 then looks like the following(the messaging getting-started guide has more information):

@Inject
@Channel("topic1")
Emitter<String> emitter;

@GET
public String doSend() {
emitter.send("Hello World");
return "done";
}

When I call the endpoint via curl (The @GET indicates that this is a REST-endpoint), a message with a payload of “Hello World” is sent to topic1 on Kafka. When looking at the sent message via kafkacat we can see the W3C trace propagation header, which we already discussed in the previous part:

$ kcat -C -t topic1 -b localhost:9092 -o beginning -J
{"topic":"topic1","partition":0,"offset":0,
"tstype":"create","ts":1660559408201,"broker":0,
"headers":["traceparent","00-08b53bb8eb480e0abda9acb0ccb3e636-755009f86a258b5f-01"],
"key":null,
"payload":"Hello World"}

Note, that kafkacat only shows the payload unless a formatting option like -J is provided.

Now onto the Python code

Our Python code to start with is pretty simple. We start a consumer on topic1 and then print incoming messages:

from kafka import KafkaConsumer
consumer = KafkaConsumer('topic1')

for msg in consumer:
print(msg)

Running the code will print a message on console like the following, where we can see the passed traceparent header and the body of the message:

ConsumerRecord(topic='topic1', partition=0, 
offset=9, timestamp=1660563030653,
value=b'Hello World',
headers=[
('traceparent', b'00-8a6a62d94e611819d4f7a3523d95542b-5fd53b80e46d4ffa-01')
]
...
)

We could now extract the header and create the SpanContext as seen in the http-example etc, but this time we take the easy route and use the existing Kafka instrumentation for OpenTelemetry:

from opentelemetry.instrumentation.kafka import KafkaInstrumentor
KafkaInstrumentor().instrument()

This sets up all the necessary magic, so that our code can be pretty crisp:

consumer = KafkaConsumer('topic1')
producer = KafkaProducer()

for msg in consumer:

with tracer.start_as_current_span("do-the-work-span") as span:
# do the work
body = msg.value.decode('utf-8')
body = body + ' from Python'
# and send it off to topic2
producer.send('topic2', body.encode('utf-8'))

Triggering a Kafka-message coming from Quarkus (and the Quarkus receiver already in place) we get the following display in Jaeger:

Jaeger showing 2 traces

Wait, this is not what we wanted. We see “enough” data, but those traces should be one. Some looking around and debugging indicates that the Kafka instrumentation does not (yet as of v1.11) propagate the received traceId as our parent trace.

There is help

With the traceparent in the header and the knowledge from Part 2 of the series, we can easily fix this:

# look for traceparent header and return its value
trace_parent = get_trace_parent_header(msg)
# Create a SpanContext object from the header value
span_context = extract_trace_data(trace_parent)
# Use this SpanContext as parent
ctx = trace.set_span_in_context(NonRecordingSpan(span_context))

with tracer.start_as_current_span("do-the-work-span", context=ctx) as span:

And now it looks like it is supposed to be:

Full trace of tracing a message over Kafka

The Quarkus receiver

The last missing piece is the receiving side in Quarkus. And it is as simple as:

@Incoming("topic2")
void process(String message) {

System.out.println("Got a message: " + message);

}

Yep, that’s it. The instrumentation is doing everything for us.

Resend the message within Quarkus

Like in the Python case, I now wanted to forward the incoming message again. The setup stays largely the same, but within the Quarkus process, there will be a check on the received message and then eventually forwarded to k-serv again:

New slightly changed setup

When the trigger is hit, message (1) is sent to k-serv, which transforms it and sends message (2) back. Q-serv then checks if it has seen it before and otherwise forwards it as message (3) again to k-serv, which then transforms and re-sends again (4).

As in the Python example, we want the forwarding in Quarkus to also propagate the trace-information to get a full picture.

Unfortunately this propagation is not yet automatic (as of Quarkus 2.11) and we need to do a little work ourselves. First we need to change the signature of the method:

@Incoming("topic2")
CompletionStage<Void> process(Message<String> message) {

This allows us to retrieve the headers and thus the traceparent:

Optional<TracingMetadata> optionalTracingMetadata = TracingMetadata.fromMessage(message);
if (optionalTracingMetadata.isPresent()) {
TracingMetadata tracingMetadata = optionalTracingMetadata.get();

We then use this to set the current context for unit of work. Using the try-with-resources block ensures that the scope is closed at the end which is required by the Context API.

try (Scope _ignored =  
tracingMetadata.getCurrentContext().makeCurrent()) {

If the message should be resend (Number (3) above), we now create a new Message to which we attach the Tracing info and send it off:

// Get the body
String body = message.getPayload();
// We need to use a Message to emit with headers
Message<String> out = Message.of(body);

// Add the tracing metadata to the outgoing message header
out = out.addMetadata(
TracingMetadata.withCurrent(Context.current()));

// And send to round 2
emitter.send(out);

With these changes we can nicely see how the message is processed:

Screenshot from Jaeger UI showing the processing of the message

Show me the code

Both Python and Quarkus code live on GitHub. The extended usecase of Quarkus forwarding data lives in the forwarding branch.

Thanks goes to Bruno Baptista for feedback on this article.

--

--