您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
AWS Lambda用于事件驱动的数据处理管道
 
作者:Vadim Astakhov   来源: blogs   火龙果软件  发布于 2015-11-13
   次浏览      
 

摘要:一些大数据客户想分析新数据以对特定事件作出响应,他们可能已经定义好管道来执行批处理操作,这些管道是由AWS Data Pipeline精心协调安排的。

一些大数据客户想分析新数据以对特定事件作出响应,他们可能已经定义好管道来执行批处理操作,这些管道是由AWS Data Pipeline精心协调安排的。事件触发管道的示例之一就是当数据分析师在一收到数据就必须对其进行分析时,以便他们可以立刻向合作伙伴作出相应。在这种情况下调度不是最优的解决方案,主要问题是如何在任意时间使用依赖于调度程序的Data Pipeline调度数据处理过程。

这里有一个解决方案。首先,创建一个简单的管道,使用来自Amazon S3的数据对管道进行测试,然后添加一个Amazon SNS主题,使其在管道完成时通知客户,以便数据分析师能够查看处理结果。最后,创建一个AWS Lambda函数,使其在新数据被成功提交到S3桶中时激活Data Pipeline,在此过程中,不用管理任何调度活动。该篇帖子将会向你展示如何实现这一过程。

在Data Pipeline活动可被调度时,客户可以定义先决条件。这些先决条件可以看到数据是否存在于S3中,然后进行资源分配。但是,在Data Pipeline需要随时被激活时,使用Lambda是一种很好的途径。

克隆管道以备后用

在这种场景下,客户的管道已经通过一些预定的活动被激活,但是想要能够调用相同的管道以对某个特别事件,如提交新数据到S3桶中,作出响应。客户已经开发了一个达到Finished状态的“模板”管道。

重新发起该管道的一种方法是在S3中使用管道定义来保存JSON文件,使用它创建一个新管道。一些客户在S3中对相同管道以多个版本的形式存储,但是又想克隆和重新使用最近刚刚执行的那个管道版本。从已完成管道中获取管道定义并创建一个克隆管道,这是可以满足这种要求的简单方法。这种方法依赖于最近被执行的管道,不需要客户保存来自S3的管道版本注册表,也不需要追踪最近被执行的版本。

即使客户想在S3中保留这样的一个管道注册表,他们可能也想使用Lambda API即时从一个既存的管道中获取一个管道定义。他们可能有复杂的事件驱动工作流程,在这些流程中,他们需要克隆已完成的管道,重新运行它们,然后删除克隆的管道。这就是为什么首先检测处于Finished状态的管道是如此重要了。

在本篇帖子中,我会向你展示如何完成这样即时的管道克隆。在Data Pipeline中没有直接克隆API,所以你可以进行几次API调用完成这一过程。我也提供了代码,使你能够删除已完成的过时的克隆管道。

三步式工作流程

  1. 创建一个简单管道用于测试。
  2. 创建一个SNS通知,在管道完成时通知分析师。
  3. 创建一个Lambda函数,在新数据被提交到S3桶中时激活管道

第一步:创建一个简单管道。

  1. 打开AWS Data Pipeline控制台。
  2. 如果在该域中还没有创建管道,控制台屏幕将会展示介绍性的信息。在这种情况下,选择Get started now。如果在该域中你已经创建过管道了,控制台将会显示你在该域中创建的所有管道。在这种情况下,选择Create new pipeline
  3. 输入名称和描述信息。
  4. 选择一个Elastic MapReduce (EMR)模板,然后选择Run once on pipeline activation
  5. Step字段中,输入如下信息:
/home/hadoop/contrib/streaming/hadoop-streaming.jar,-input,
s3n://elasticmapreduce/samples/wordcount/input,-output,
s3://example-bucket/wordcount/output/#{@scheduledStartTime},-mapper,
s3n://elasticmapreduce/samples/wordcount/wordSplitter.py,-reducer,aggregate

你可以调整Amazon EMR集群节点的数量,选择分发方式。想要获取管道创建的更多信息,参见Getting Started with AWS Data Pipeline。

第二步:创建一个SNS主题

想要创建一个SNS主题,执行以下步骤:

  1. 在浏览器的一个新页签中,打开Amazon SNS console(Amazon SNS控制台)。
  2. 选择Create topic
  3. Topic name字段中,输入主题名称。
  4. 选择Create topic

选择新主题,然后选择主题ARN。Topic Details页面出现

  1. 拷贝主题ARN用于下一个任务。
  2. 为该主题创建订阅任务,提供你的电子邮件地址。AWS会发送电子邮件来确认你的订阅结果。

想要在管道中配置主题通知动作,执行以下步骤

  1. 在AWS Data Pipeline控制台,在Architect窗口中打开你的管道。
  2. 在右侧窗格中,选择Others
  3. 在DefaultAction1下,执行如下步骤:
    1. 输入通知的名称(如MyEMRJobNotice)
    2. Type字段中,选择SnsAlarm
    3. Subject字段中,输入事由行。
    4. Topic Arn字段中,输入主题的ARN。
    5. Message字段中,输入消息内容。
    6. LeaveRoleset to the default value.Role保留其默认值。

保存并激活管道,确保它能成功执行。

第三步:创建一个Lambda函数

在Lambda控制台中,选择Create a Lambda function。你可以选择一个蓝图或者只是跳过第一步,继续进行Step 2: Configure function第二步:配置函数),在该步骤中,你提供一个函数名称(如LambdaDP)和一条描述信息,选择Node.js作为Runtime字段的值。

测试管道已经完成。目前仍不支持重新运行已完成的管道。要想重新运行一个已完成管道,从模板中克隆该管道,Lambda会触发一个新管道。每一次清除老的克隆管道时,你将需要Lambda来创建一个新克隆管道。下面是帮助实现新管道克隆的一些函数。在Lambda控制台中,使用Code entry typeEdit code inline字段,以下面的代码开始:

console.log('Loading function'); var AWS = require('aws-sdk'); exports.handler  
= function(event, context) { var Data Pipeline = new AWS.Data Pipeline();
var pipeline2delete ='None'; var pipeline ='df-02….T'; ………. }

定义管道ID,为克隆管道ID创建一个变量,比如pipeline2delete。然后,添加一个函数,执行下面的代码,检查前面的运行过程中遗留下来的既存克隆管道:

//Iterate over the list of pipelines and check if the pipeline clone already  
exists Data Pipeline.listPipelines(paramsall, function(err, data) { if
(err) {console.log(err, err.stack); // an error occurred} else {console.log(data);
// successful response for (var i in data.pipelineIdList){ if (data.pipelineIdList[i].name
=='myLambdaSample') { pipeline2delete = data.pipelineIdList[i].id; console.log('Pipeline
clone id to delete: ' + pipeline2delete); };

如果前面的运行过程中遗留下来的已完成克隆管道已经被识别出来,你必须在该循环中调用删除函数。下面展示了实现调用的示例代码:

var paramsd = {pipelineId: pipeline2delete /* required */};  
Data Pipeline.deletePipeline(paramsd, function(err, data) {
if (err) {console.log(err, err.stack); // an error occurred}
else console.log('Old clone deleted ' + pipeline2delete + ' Create new clone now');
});

最后,你需要进行三次API调用,从原来的Data Pipeline模板中创建一个新的克隆。下面是你可以使用的API:

  • getPipelineDefinition (for the finished pipeline)
  • createPipeline
  • putPipelineDefinition (from #1)

下面是这三次调用的示例:

1、使用管道定义创建下一个克隆:

var params = {pipelineId: pipeline};    
Data Pipeline.getPipelineDefinition(params, function(err, definition) {
if (err) console.log(err, err.stack); // an error occurred
else {
var params = {
name: 'myLambdaSample', /* required */
uniqueId: 'myLambdaSample' /* required */
}; <b>
</b>

2、使用来自定义对象的克隆定义:

Data Pipeline.createPipeline(params, function(err, pipelineIdObject) {  
if (err) console.log(err, err.stack); // an error occurred
else { //new pipeline created with id=pipelineIdObject.pipelineId
console.log(pipelineIdObject); // successful response
//Create and activate pipeline
var params = {
pipelineId: pipelineIdObject.pipelineId,
pipelineObjects: definition.pipelineObjects//(you can add parameter objects and values)

3、使用来自getPipelineDefinition API结果的定义:

Data Pipeline.putPipelineDefinition(params, function(err, data) {  
if (err) console.log(err, err.stack);
else {
Data Pipeline.activatePipeline(pipelineIdObject,
function(err, data) { //Activate the pipeline finally
if (err) console.log(err, err.stack);
else console.log(data);
});
}
});
}});
}});

现在你具备了Lambda函数所需的所有函数调用过程。你也可以执行下面的步骤将这些调用过程打包成一个独立的函数:

输入Handler字段的值作为函数(LambdaDP.index)的名称。

Role。该选项可以使你访问像S3和Data Pipeline这样的资源。

  • 保留MemoryTimeout的默认值。
  • 选择Next,检查函数,选择Create function
  • Event source字段中,选择S3
  • 提供管道所使用的桶的名称。
  • Event type字段,选择Put。在新文件被提交到桶中时,该选项会激活管道。
  • 保存管道,上传一个数据文件到S3桶中。
  • 检查Data Pipeline控制台,确保新管道已经创建完毕并已被激活(管道完成后,你应该能收到一条SNS通知消息)。

 

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新活动计划
LLM大模型应用与项目构建 12-26[特惠]
QT应用开发 11-21[线上]
C++高级编程 11-27[北京]
业务建模&领域驱动设计 11-15[北京]
用户研究与用户建模 11-21[北京]
SysML和EA进行系统设计建模 11-28[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...