消息队列编辑

Node.js 代理将自动为往返 Amazon SQS 消息队列的活动创建跨度。要记录这些跨度,您的消息队列活动必须在事务期间发生。如果您在来自 受支持的框架 的 HTTP 请求期间执行队列操作,则代理将自动启动事务。但是,如果您在独立程序(例如消息处理器)中执行队列操作,则需要使用 Node.js 代理的 startTransaction() 方法为您的消息手动创建事务。

您可以在以下代码示例中看到这一点。

const apm = require('elastic-apm-node').start({/*...*/})
const AWS = require('aws-sdk');
// Set the region
AWS.config.update({region: 'us-west'});

// Create an SQS service object
const sqs = new AWS.SQS({apiVersion: '2012-11-05'});

/* ... */

const transaction = apm.startTransaction("Process Messages", 'cli') 
sqs.receiveMessage(params, function(err, data) {
  if(err) {
    console.log("Receive Error", err);
  } else {
    console.log(`Length: ${data.Messages.length}`)
    /* process messages */
  }
  // end the transaction
  transaction.end() 
})

在调用 sqs.receiveMessage 方法之前,启动一个新事务。

仅在队列的处理回调完成执行*之后*结束事务。这将确保在处理您的队列消息时事务处于活动状态。

分布式追踪和消息队列编辑

要使用分布式追踪启用队列调度和队列处理,请使用 Node.js 代理的 API *存储*队列消息的 traceparent 标头;然后,在启动新事务时提供该 traceparent 标头。

这是一个*新*示例,它使用 Node.js 代理 API 将 traceparent 存储为消息属性,然后使用该属性将您的新事务与原始事务链接起来。

存储 Traceparent

发送消息时,您需要将追踪添加为 MessageAttributes 之一。

// stores the traceparent when sending the queue message
const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : ''

// Use the Amazon SQS `MessageAttributes` to pass
// on the  traceparent header
const params = {
  /* ... other params ... */
  MessageAttributes: {
    /* ... other attributes ... */
    "MyTraceparent":{
        DataType: "String",
        StringValue: traceParent
    }
  }

}
sqs.sendMessage(params, function(err, data) {
  /* ... */
});

这将保存 traceparent 值,以便我们以后在接收消息时可以使用它。

应用 Traceparent

当我们收到队列消息时,我们将检查消息中是否有 Traceparent 标头,并使用它来启动一个新事务。通过使用此 traceparent 标头启动事务,我们将通过分布式追踪链接发送和接收。

// uses the traceparent to start a transaction

sqs.receiveMessage(params, function(err, data) {
  if(!data.Messages) {
    return
  }

  // loop over your returned messages
  for(const message of data.Messages) { 
    // start a transaction to process each message, using our previously
    // saved distributed tracing traceparent header
    let traceparent
    if(message.MessageAttributes.MyTraceparent) {
        traceparent = message.MessageAttributes.MyTraceparent.StringValue
    }
    const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', {
      childOf:traceparent 
    })
    /* ... process message ... */
    transactionMessage.end() 
  }
})

即使我们只调度了一条队列消息,Amazon 的 SQS API 也会返回*多个*消息的数组。因此,我们需要遍历每一个消息。

我们提取之前保存的 traceparent 标头,并使用它来启动一个事务。

处理完一条消息后,我们结束事务并继续处理下一条消息。