消息队列

编辑

Node.js 代理会自动为与 Amazon SQS 消息队列之间活动的创建跨度。要记录这些跨度,您的消息队列活动必须在事务期间发生。如果您在来自支持的框架的 HTTP 请求期间执行队列操作,代理将自动启动事务。但是,如果您在独立程序(例如消息处理器)中执行队列操作,则需要使用 Node.js 代理的startTransaction()方法手动为您的消息创建事务。

您可以在以下代码示例中看到这一点。

const apm = require('elastic-apm-node').start({/*...*/})
const AWS = require('aws-sdk');
// Set the region
AWS.config.update({region: 'us-west'});

// Create an SQS service object
const sqs = new AWS.SQS({apiVersion: '2012-11-05'});

/* ... */

const transaction = apm.startTransaction("Process Messages", 'cli') 
sqs.receiveMessage(params, function(err, data) {
  if(err) {
    console.log("Receive Error", err);
  } else {
    console.log(`Length: ${data.Messages.length}`)
    /* process messages */
  }
  // end the transaction
  transaction.end() 
})

在调用sqs.receiveMessage方法之前,启动一个新事务。

只有在队列的处理回调执行完毕之后才能结束事务。这将确保在处理队列消息时事务处于活动状态。

分布式追踪和消息队列

编辑

要启用具有分布式追踪的队列调度和队列处理,请使用 Node.js 代理的 API 将traceparent标头存储到您的队列消息中;然后,在启动新事务时提供该traceparent标头。

这是一个使用 Node.js 代理 API 将traceparent作为消息属性存储,然后使用该属性将您的新事务与原始事务关联的示例。

存储 Traceparent

发送消息时,您需要将追踪添加为MessageAttributes之一。

// stores the traceparent when sending the queue message
const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : ''

// Use the Amazon SQS `MessageAttributes` to pass
// on the  traceparent header
const params = {
  /* ... other params ... */
  MessageAttributes: {
    /* ... other attributes ... */
    "MyTraceparent":{
        DataType: "String",
        StringValue: traceParent
    }
  }

}
sqs.sendMessage(params, function(err, data) {
  /* ... */
});

这将保存 traceparent 值,以便我们稍后在接收消息时使用它。

应用 Traceparent

当我们收到队列消息时,我们将检查消息中的 Traceparent 标头,并用它来启动一个新事务。通过使用此 traceparent 标头启动事务,我们将通过分布式追踪链接发送和接收。

// uses the traceparent to start a transaction

sqs.receiveMessage(params, function(err, data) {
  if(!data.Messages) {
    return
  }

  // loop over your returned messages
  for(const message of data.Messages) { 
    // start a transaction to process each message, using our previously
    // saved distributed tracing traceparent header
    let traceparent
    if(message.MessageAttributes.MyTraceparent) {
        traceparent = message.MessageAttributes.MyTraceparent.StringValue
    }
    const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', {
      childOf:traceparent 
    })
    /* ... process message ... */
    transactionMessage.end() 
  }
})

即使我们只调度了一条队列消息,Amazon 的 SQS API 也会返回多个消息的数组。因此,我们需要遍历每一个。

我们提取之前保存的 traceparent 标头,并用它来启动一个事务。

处理完一条消息后,我们结束事务并继续处理下一条。