在上一篇环境设置的里,我们提供了一个分布式做ETL处理的例子。等价于实现了一个 Python UDF。 在这一篇中,我们会详细介绍使用 Byzer-pyhton

演示数据准备

set jsonStr='''
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":1.0},
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[4.4,2.9,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":1.0},
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[4.7,3.2,1.3,0.2],"label":1.0},
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
''';
load jsonStr.`jsonStr` as data;

保存至数据湖:

save overwrite data as delta.`example.mock_data`;

加载数据湖里的 mock_data,并做简单的处理,得到 sample_data

load delta.`example.mock_data` as example_data;
select features[0] as a ,features[1] as b from example_data
as sample_data;

1. Byzer-python 处理数据

如果数据规模不大,可以在 Byzer-Notebook 中使用如下 Python 脚本对表 sample_data 进行处理:

#%python
#%input=sample_data
#%output=python_output_table
#%schema=st(field(_id,string),field(x,double),field(y,double))
#%runIn=driver
#%dataMode=model
#%cache=true
#%pythonExec=/home/winubuntu/miniconda3/envs/byzerllm-desktop/bin/python
#%env=source /home/winubuntu/miniconda3/bin/activate byzerllm-desktop

import ray
from pyjava.api.mlsql import RayContext

ray_context = RayContext.connect(globals(), None)
rows = RayContext.collect_from(ray_context.data_servers())
id_count = 1

def handle_record(row):
    global id_count
    item = {"_id": str(id_count)}
    id_count += 1
    item["x"] = row["a"]
    item["y"] = row["b"]
    return item

result = [handle_record(row) for row in rows]
context.build_result(result)
''';

上面的代码如果去Byzer-Notebook 提供的语法糖的话,会长成这个样子:

!python env "PYTHON_ENV=source /home/winubuntu/miniconda3/bin/activate byzerllm-desktop";
!python conf "runIn=driver"
!python conf "dataMode=model";
!python conf "schema=st(field(_id,string),field(x,double),field(y,double)

run command as Ray.`` where 
inputTable="sample_data"
and outputTable="python_output_table"
and code='''
import ray
from pyjava.api.mlsql import RayContext

ray_context = RayContext.connect(globals(), None)
rows = RayContext.collect_from(ray_context.data_servers())
id_count = 1

def handle_record(row):
    global id_count
    item = {"_id": str(id_count)}
    id_count += 1
    item["x"] = row["a"]
    item["y"] = row["b"]
    return item

result = [handle_record(row) for row in rows]
context.build_result(result)
''';

2. Byzer-python 代码说明

注意到 Python 脚本以字符串参数形式出现在代码中,这是 Byzer-Python 代码的一个模版。其中参数 inputTable 指定需要处理的表,没有需要处理的表时,可设置为 command ;参数 outputTable 指定输出表的表名;参数 code 为需要执行的 Python 脚本。

run command as Ray.`` where 
inputTable="sample_data"
and outputTable="python_output_table"
and code='''
import ray
......
''';

传入的 Python 脚本:

## 引入必要的包
import ray
from pyjava.api.mlsql import RayContext

## 获取 ray_context,如果需要使用 Ray 集群,那么第二个参数填写集群 Master 节点的地址
## 否则设置为None就好。
ray_context = RayContext.connect(globals(), None)

# 通过ray_context.data_servers() 获取所有数据源,如果开启了Ray,那么就可以
# 分布式获取这些数据进行处理。
rows = RayContext.collect_from(ray_context.data_servers())
id_count = 1

## 从 java 端接受的数据格式也是list(dict),也就是说,每一行的数据都以字典的数据结构存储。
## 比如 sample_data 的数据,在 Python 端拿到的结构就是
## [{'a':'5.1','b':'3.5'}, {'a':'5.1','b':'3.5'}, {'a':'5.1','b':'3.5'} ...] 
## 基于这个数据结构,我们对输入数据进行数据处理
def handle_record(row):
    global id_count
    item = {"_id": str(id_count)}
    id_count += 1
    item["x"] = row["a"]
    item["y"] = row["b"]
    return item

result = [handle_record(row) for row in rows]

## 此处 result 是一个迭代器,context.build_result 也支持传入生成器/数组
context.build_result(result)

3. Byzer-python 读写 Excel 文件

Python 有很多处理 Excel 文件的库,功能成熟完善,您可以在 Byzer-python 环境中安装相应的库来处理您的 Excel 文件。这里以 pandas 为例来读取和保存 Excel 文件(需要安装 xlrd/xlwt 包,pip install xlrd==1.2.0 xlwt):

-- 将上文 sample_data 保存成 Excel 文件

!python env "PYTHON_ENV=source activate dev";
!python conf "schema=st(field(file,binary))";
!python conf "dataMode=model";
!python conf "runIn=driver";
run command as Ray.`` where 
inputTable="sample_data"
and outputTable="excel_data"
and code='''
import io
import ray
import pandas as pd
from pyjava.api.mlsql import RayContext, PythonContext

ray_context = RayContext.connect(globals(), None)

data = ray_context.to_pandas()

output = io.BytesIO()
writer = pd.ExcelWriter(output, engine='xlwt')
data.to_excel(writer, index=False)
writer.save()
xlsx_data = output.getvalue()

context.build_result([{"file":xlsx_data}])
''';
!saveFile _ -i excel_data -o /tmp/sample_data.xlsx; 
-- 读取 sample_data.xlsx 文件

load binaryFile.`/tmp/sample_data.xlsx` as excel_table;

!python env "PYTHON_ENV=source activate dev";
!python conf "schema=st(field(a,double),field(b,double))";
!python conf "dataMode=model";
!python conf "runIn=driver";
run command as Ray.`` where 
inputTable="excel_table"
and outputTable="excel_data"
and code='''
import io
import ray
from pyjava.api.mlsql import RayContext
import pandas as pd

ray_context = RayContext.connect(globals(),None)

file_content = ray_context.to_pandas().loc[0, "content"]

df = pd.read_excel(io.BytesIO(file_content))
data = [row for row in df.to_dict('records')]
context.log_client.log_to_driver(data)
context.build_result(data)
''';

4. Byzer-python 分布式计算

分布式处理依赖 Ray 环境,您可以参考Ray 环境搭建 搭建 Ray 集群。这里我们简单介绍下如何使用 Pyjava 高阶 API 使用 Ray 完成分布式计算:

!python env "PYTHON_ENV=source activate dev";
!python conf "schema=st(field(_id,string),field(x,double),field(y,double))";
!python conf "dataMode=model";
!python conf "runIn=driver";

run command as Ray.`` where 
inputTable="sample_data"
and outputTable="python_output_table"
and code='''
import ray
from pyjava import rayfix
from pyjava.api.mlsql import RayContext
import socket

## 获取 ray_context,这里需要使用 Ray,第二个参数填写 Ray head-node 的地址和端口
ray_context = RayContext.connect(globals(), '127.0.0.1:10001')

## Ray 集群分布式处理
@ray.remote
@rayfix.last
def handle_record(servers):

    datas = RayContext.collect_from(servers)
    
    result = []
    for row in datas:
        item = {"_id": socket.gethostname()}
        item["x"] = row["a"]
        item["y"] = row["b"]
        result.append(item)
    return result
    
data_servers = ray_context.data_servers()
res =  ray.get(handle_record.remote(data_servers))
## 构造结果数据返回
context.build_result(res)
''';

5. Byzer-python 图表绘制

您可以在 Byzer 桌面版 和 Bzyer Notebook 中使用 Python 绘图包(matplotlibplotlypyecharts 等,需要提前安装)绘制精美的图表,并用 Byzer-python 提供的 API 输出图片:

-- 绘图数据
set jsonStr='''
{"Busn_A":114,"Busn_B":57},
{"Busn_A":55,"Busn_B":134},
{"Busn_A":27,"Busn_B":137},
{"Busn_A":101,"Busn_B":129},
{"Busn_A":125,"Busn_B":145},
{"Busn_A":27,"Busn_B":60},
{"Busn_A":105,"Busn_B":49}
''';
load jsonStr.`jsonStr` as data;

使用 pyecharts 绘制图表:

!python env "PYTHON_ENV=source activate dev";
!python conf "schema=st(field(content,string),field(mime,string))";
!python conf "dataMode=model";
!python conf "runIn=driver";

run command as Ray.`` where 
inputTable="data"
and outputTable="plt"
and code='''
from pyjava.api.mlsql import RayContext,PythonContext
from pyecharts import options as opts
import os
from pyecharts.charts import Bar

ray_context = RayContext.connect(globals(),None)

data = ray_context.to_pandas()
data_a = data['Busn_A']
data_b = data['Busn_B']

# 基本柱状图
bar = Bar()
bar.add_xaxis(["Shirt", "Sweater", "Tie", "Pants", "Hat", "Gloves", "Socks"])


bar.add_yaxis("Saler A", list(data_a))
bar.add_yaxis("Saler B", list(data_b))
bar.set_global_opts(title_opts=opts.TitleOpts(title="Sales Info"))
bar.render('bar_demo.html')  # 生成html文件
html = ""
with open("bar_demo.html") as file:
   html = "\n".join(file.readlines())
os.remove("bar_demo.html")
context.build_result([{"content":html,"mime":"html"}])
''';

image-plot

使用 matplotlib 绘制图表:

!python env "PYTHON_ENV=source activate dev";
!python conf "schema=st(field(content,string),field(mime,string))";
!python conf "dataMode=model";
!python conf "runIn=driver";

run command as Ray.`` where 
inputTable="data"
and outputTable="plt"
and code='''
from pyjava.api.mlsql import RayContext,PythonContext
import matplotlib.pyplot as plt
import numpy as np
from pyjava.api import Utils
ray_context = RayContext.connect(globals(),None)

data = ray_context.to_pandas()


labels = ["Shirt", "Sweater", "Tie", "Pants", "Hat", "Gloves", "Socks"]
men_means = data['Busn_A']
women_means = data['Busn_B']

x = np.arange(len(labels))  # the label locations
width = 0.35  # the width of the bars

fig, ax = plt.subplots()
rects1 = ax.bar(x - width/2, men_means, width, label='Saler A')
rects2 = ax.bar(x + width/2, women_means, width, label='Saler B')

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_ylabel('Sales')
ax.set_title('Sales Info')
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.legend()


def autolabel(rects):
    """Attach a text label above each bar in *rects*, displaying its height."""
    for rect in rects:
        height = rect.get_height()
        ax.annotate('{}'.format(height),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom')


autolabel(rects1)
autolabel(rects2)

fig.tight_layout()

Utils.show_plt(plt, context)
''';

image-plot2

Logo

更多推荐