Mara-pipelines是一个轻量级的数据转换框架,具有透明、低复杂度的特点。其他特点如下:基于非常简单的Python代码即可完成流水线开发。使用PostgreSQL作为数据处理引擎。有一个Web界面可以可视化和分析管道的执行过程。基于Python的多处理独立管道执行。不需要分布式任务队列。轻松调试和输出日志。基于成本的优先级队列:成本较高的节点(基于记录的运行时间)首先运行。另外,在Mara-pipelines的web界面中,您不仅可以查看和管理管道及其任务节点,还可以直接触发这些管道和节点,非常好用:1.安装Mara-Pipelines不支持Windows,如果需要在Windows上使用Mara-pipelines,请使用Docker或Windows下的linux子系统。使用pip安装Mara-pipelines:pipinstallmara-pipelines或:pipinstallgit+https://github.com/mara/mara-pipelines.git2。使用示例这是一个基本的流水线演示,由三个相互依赖的Node组成,包括任务1(ping_localhost)、子流水线(sub_pipeline)、任务2(sleep):#注意这个例子中使用了一些国外网站。如果访问不了,请换国内网站。frommara_pipelines.commands.bashimportRunBashfrommara_pipelines.pipelinesimportPipeline,Taskfrommara_pipelines.ui.cliimportrun_pipeline,run_interactivelypipeline=Pipeline(id='demo',description='一个演示管道、任务和命令之间相互作用的小管道')pipeline.add(Task(id='ping_localhost',description='Pingslocalhost',commands=[RunBash('ping-c3localhost')]))sub_pipeline=Pipeline(id='sub_pipeline',description='Pings['google','amazon','facebook']中的一些主机'):sub_pipeline.add(Task(id=f'ping_{host}',description=f'Pings{host}',命令=[RunBash(f'ping-c3{host}.com')]))sub_pipeline.add_dependency('ping_amazon','ping_facebook')sub_pipeline.add(Task(id='ping_foo',description='Pingsfoo',commands=[RunBash('pingfoo')]),['ping_amazon'])pipeline.add(sub_pipeline,['ping_localhost'])pipeline.add(Task(id='sleep',description='Sleepsfor2seconds',commands=[RunBash('sleep2')]),['sub_pipeline'])可以看出Task中包含了多个命令,这些命令将用于real中pipeline.add的参数,第一个参数是它的节点,第二个参数是这个节点的上游。例如:pipeline.add(sub_pipeline,['ping_localhost'])表示执行完ping_localhost后必须执行sub_pipeline。为了运行这个管道,需要配置一个PostgreSQL数据库来存储运行时信息、操作输出和增量处理状态:dbs.PostgreSQLDB(host='localhost',user='root',database='example_etl_mara')}mara_db.auto_migration.auto_discover_models_and_migrate()如果PostgresSQL正在运行且账号密码正确,则输出如下(创建了一个数据库有多个表):已创建数据库“postgresql+psycopg2://root@localhost/example_etl_mara”创建表data_integration_file_dependency(node_pathTEXT[]NOTNULL,dependency_typeVARCHARNOTNULL,散列VARCHAR,时间戳TIMESTAMPWITHOUTTIMEZONE,KEY_path,KEY_pathdependency_type));..moretables为了运行这条流水线,你需要:frommara_pipelines.ui.cliimportrun_pipelinerun_pipeline(pipeline)这将运行单个管道节点和它的所有节点(sub_pipeline)取决于:run_pipeline(sub_pipeline,nodes=[sub_pipeline.nodes['ping_amazon']],with_upstreams=True)3.Web界面我认为mara-pipelines最有用的一点是它们提供了一个基于Flask的web界面来控制管道。对于每个管道,它们都有一个页面显示:所有子节点及其之间依赖关系的图表过去30天内具有最昂贵节点(可配置)的关系管道的总体运行时间图所有管道节点及其平均运行时间的表以及由此产生的排队优先级流水线最后一次运行的输出和时间线对于每一个任务,都有一个页面显示最近30天流水线中任务上游和下游的任务运行时间。任务的所有命令的输出和任务的最后一次运行。此外,管道和任务可以直接从网页调用和运行,这是一个很棒的特性:到此我们的文章结束。如果喜欢今天的Python实战教程,请关注公众号:Python编程学习圈,了解更多编程技术干货,送“J”获取海量学习资料!
