消息队列
编辑消息队列
编辑Node.js 代理会自动为与 Amazon SQS 消息队列之间活动的创建跨度。要记录这些跨度,您的消息队列活动必须在事务期间发生。如果您在来自支持的框架的 HTTP 请求期间执行队列操作,代理将自动启动事务。但是,如果您在独立程序(例如消息处理器)中执行队列操作,则需要使用 Node.js 代理的startTransaction()
方法手动为您的消息创建事务。
您可以在以下代码示例中看到这一点。
const apm = require('elastic-apm-node').start({/*...*/}) const AWS = require('aws-sdk'); // Set the region AWS.config.update({region: 'us-west'}); // Create an SQS service object const sqs = new AWS.SQS({apiVersion: '2012-11-05'}); /* ... */ const transaction = apm.startTransaction("Process Messages", 'cli') sqs.receiveMessage(params, function(err, data) { if(err) { console.log("Receive Error", err); } else { console.log(`Length: ${data.Messages.length}`) /* process messages */ } // end the transaction transaction.end() })
分布式追踪和消息队列
编辑要启用具有分布式追踪的队列调度和队列处理,请使用 Node.js 代理的 API 将traceparent
标头存储到您的队列消息中;然后,在启动新事务时提供该traceparent
标头。
这是一个使用 Node.js 代理 API 将traceparent
作为消息属性存储,然后使用该属性将您的新事务与原始事务关联的新示例。
存储 Traceparent
发送消息时,您需要将追踪添加为MessageAttributes
之一。
// stores the traceparent when sending the queue message const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : '' // Use the Amazon SQS `MessageAttributes` to pass // on the traceparent header const params = { /* ... other params ... */ MessageAttributes: { /* ... other attributes ... */ "MyTraceparent":{ DataType: "String", StringValue: traceParent } } } sqs.sendMessage(params, function(err, data) { /* ... */ });
这将保存 traceparent 值,以便我们稍后在接收消息时使用它。
应用 Traceparent
当我们收到队列消息时,我们将检查消息中的 Traceparent 标头,并用它来启动一个新事务。通过使用此 traceparent 标头启动事务,我们将通过分布式追踪链接发送和接收。
// uses the traceparent to start a transaction sqs.receiveMessage(params, function(err, data) { if(!data.Messages) { return } // loop over your returned messages for(const message of data.Messages) { // start a transaction to process each message, using our previously // saved distributed tracing traceparent header let traceparent if(message.MessageAttributes.MyTraceparent) { traceparent = message.MessageAttributes.MyTraceparent.StringValue } const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', { childOf:traceparent }) /* ... process message ... */ transactionMessage.end() } })