202412

curl 证书报错

curl 提示这个错

curl: (60) SSL certificate problem: self signed certificate in certificate chain
More details here: https://curl.haxx.se/docs/sslcerts.html

curl failed to verify the legitimacy of the server and therefore could not
establish a secure connection to it. To learn more about this situation and
how to fix it, please visit the web page mentioned above.

这个错误表示 curl 在建立 HTTPS 连接时,无法验证服务器提供的 SSL/TLS 证书。错误的主要原因是服务器的证书链中有一个自签名证书,无法通过 curl 默认的信任存储(CA 根证书)验证。

以下是解决方法的几种方式:


1. 检查服务器的证书链

  • 确保服务器提供的证书是由受信任的 CA 签发的,而不是自签名证书。

  • 你可以使用以下命令检查服务器证书链:

    1
    openssl s_client -connect <server>:443 -showcerts

    其中 <server> 替换为目标服务器的域名。如果发现证书链中有问题(比如缺少中间证书),需要联系服务器管理员修复。


2. 忽略 SSL 证书验证

如果你明确知道连接的服务器是安全的,可以临时忽略证书验证:

1
curl -k https://<url>

但这种方法不推荐用于生产环境,因为它会跳过安全检查。


3. 指定自定义 CA 文件

如果你有服务器的证书或受信任的 CA 文件,可以通过 --cacert 参数指定:

1
curl --cacert /path/to/ca.pem https://<url>

4. 添加自签名证书到系统信任存储

  • 下载服务器的自签名证书(或从管理员获取)。
  • 将证书添加到系统的 CA 信任存储:
    • Linux:

      1. 将证书放入 /usr/local/share/ca-certificates/ 目录。
      2. 执行以下命令更新 CA 存储:
        1
        sudo update-ca-certificates
    • macOS:

      1. 使用 Keychain Access 工具将证书导入,并设置为信任。
    • Windows:

      1. 使用 管理工具 > 证书 添加到“受信任的根证书颁发机构”。

5. 更新 curl 和系统根证书

  • 确保 curl 和操作系统的 CA 存储是最新的:
    • Linux:

      1
      2
      sudo apt update && sudo apt upgrade
      sudo apt install --reinstall ca-certificates
    • macOS:
      使用 brew 更新 curl

      1
      2
      brew update
      brew upgrade curl
    • Windows:
      重新安装 curl 或更新操作系统。


6. 环境变量

如果证书链中有问题,可以使用以下环境变量忽略验证(临时措施):

1
2
export CURL_SSL_BACKEND=openssl
export SSL_CERT_FILE=/path/to/ca.pem

ES 集群中节点内存不一致的问题

三个节点的ES集群,其中两台机器128G内存,有一台只有32G内存,在索引数据的时候内存小的这台机器经常被oom killer杀掉,如何解决这个问题?

在 Elasticsearch 集群中,如果节点之间的硬件配置差异较大(例如内存),会导致负载分配不均的问题。特别是索引数据时,资源不足的节点容易因内存溢出(OOM)被系统杀掉。以下是解决这一问题的具体方法:


1. 优化 JVM 内存分配

确保为每个节点的 JVM 堆内存合理配置,避免过高或过低的设置:

  • JVM 堆内存大小建议为 物理内存的 50%,但不超过 30GB:
    • 128G 内存的节点:设置为 -Xms64g -Xmx64g
    • 32G 内存的节点:设置为 -Xms16g -Xmx16g

修改 Elasticsearch 配置文件:

1
vim /etc/elasticsearch/jvm.options

调整如下:

1
2
-Xms16g
-Xmx16g

重启 Elasticsearch:

1
sudo systemctl restart elasticsearch

2. 调整索引和负载分配策略

(1)限制分片分配到内存较小的节点

你可以通过设置节点的标签,限制索引分片分配到资源有限的节点:

  1. 在内存小的节点(32G)上,添加标签:

    1
    vim /etc/elasticsearch/elasticsearch.yml

    增加以下配置:

    1
    node.attr.low_memory: true
  2. 为索引指定分片分配规则,避免分片分配到标记为 low_memory 的节点:

    1
    2
    3
    4
    5
    6
    curl -XPUT "http://<ES_HOST>:9200/my_index/_settings" -H 'Content-Type: application/json' -d'
    {
    "settings": {
    "index.routing.allocation.exclude.low_memory": "true"
    }
    }'

(2)调整分片大小

如果分片过大,索引过程中的内存消耗会显著增加。将每个分片的大小限制在 20-40GB

  • 计算合理的分片数量:

    1
    分片数 = 索引总大小 / 每个分片大小
  • 在创建索引时指定分片数:

    1
    2
    3
    4
    5
    6
    7
    curl -XPUT "http://<ES_HOST>:9200/my_index" -H 'Content-Type: application/json' -d'
    {
    "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1
    }
    }'

3. 使用协调节点

如果内存小的节点经常被杀死,考虑让它作为一个协调节点,仅负责处理请求的路由,而不存储数据或参与索引操作。

  1. 修改小内存节点的配置文件:

    1
    vim /etc/elasticsearch/elasticsearch.yml

    配置为协调节点:

    1
    2
    3
    node.master: false
    node.data: false
    node.ingest: false
  2. 重启节点后,它将作为一个轻量级的协调节点。


4. 增加虚拟内存

在资源有限的节点上,启用和增加虚拟内存(swap),避免 OOM:

  1. 检查当前 swap 配置:

    1
    free -h
  2. 如果没有启用 swap,创建一个 swap 文件:

    1
    2
    3
    4
    sudo fallocate -l 8G /swapfile
    sudo chmod 600 /swapfile
    sudo mkswap /swapfile
    sudo swapon /swapfile
  3. 持久化配置:

    1
    echo '/swapfile none swap sw 0 0' | sudo tee -a /etc/fstab

5. 优化查询和索引操作

(1)批量索引

确保客户端程序使用 Bulk API 进行批量写入,并限制单次请求的大小(如 5MB 或 500 文档)。

(2)调整刷新间隔

增加刷新间隔以减少频繁的磁盘 I/O:

1
2
3
4
5
6
curl -XPUT "http://<ES_HOST>:9200/my_index/_settings" -H 'Content-Type: application/json' -d'
{
"settings": {
"index.refresh_interval": "30s"
}
}'

6. 监控和调整

  • 使用 KibanaMetricbeat 监控节点的内存、CPU 和 I/O 使用情况。
  • 动态调整堆内存、分片分配和刷新策略。

7. 增加硬件资源

如果内存小的节点仍然无法满足需求,考虑增加其内存或替换为硬件配置一致的节点。

让被 OOM killer 杀掉的服务自动重启

要让 Elasticsearch 服务在被 OOM killer 杀掉后自动重启,可以通过调整 systemd 的服务配置来实现。以下是具体步骤:


1. 修改 systemd 服务配置

编辑 Elasticsearch 的 systemd 单元文件或添加覆盖配置:

步骤 1: 创建覆盖文件

运行以下命令创建或修改覆盖文件:

1
sudo EDITOR=vim systemctl edit elasticsearch.service

步骤 2: 添加自动重启配置

在打开的编辑器中,添加以下内容:

1
2
3
[Service]
Restart=always
RestartSec=5
  • Restart=always: 表示无论因何种原因停止,服务都会重启。
  • RestartSec=5: 设置重启前的等待时间为 5 秒(可以根据需求调整)。

步骤 3: 保存并退出

保存文件并退出编辑器。


2. 重新加载 systemd 配置

运行以下命令以使更改生效:

1
2
sudo systemctl daemon-reload
sudo systemctl restart elasticsearch.service

3. 验证配置

检查是否启用了自动重启机制:

1
systemctl show elasticsearch.service | grep Restart

应看到输出类似:

1
2
Restart=always
RestartUSec=5s

4. 查看日志

测试时可以通过 journalctl 检查日志确认服务是否成功重启:

1
journalctl -u elasticsearch.service -f

完成上述步骤后,当 Elasticsearch 被 OOM killer 杀掉时,服务会在指定的时间后自动重启。

ES 进程被 OOM 杀掉

最终做了这几个操作先:

  1. swap 从 512M 增加到了 8G:fallocate -l 8G /swapfile
  2. java 堆外内存限制在 8G:-XX:MaxDirectMemorySize=8g
  3. 设置 systemd 的 ES 服务挂掉自动拉起:Restart=always RestartSec=5

没做的操作:

  1. 手工设置 -Xms -Xmx: 这个 ES 默认的自动配置应该就可以
  2. 设置节点的标签,限制索引分片分配到资源有限的节点:这样这个节点就废了,不存储数据了
  3. 调整分片大小: 目前 10 个分片属于合理范围,分片太多碎片会太多,分片太少,单个分片太大也吃内存
  4. 使用协调节点 node.master: false node.data: false:这个节点就太鸡肋了
  5. 使用 Bulk API 进行批量写入,并限制单次请求的大小:目前已经时 bulk 写入了,单词请求适中
  6. 增加硬件配置:物理机,增加不了内存
  7. 优化刷新时间 “refresh_interval”: “30s”: 这个之前已经设置了
  8. 限制搜索线程池 thread_pool.write.size: 2:感觉和 OOM 关系不大,没改
  9. 文件描述符限制 ulimit -n 65536:应该不是这个原因
  10. 增加 mmap 支持 sysctl -w vm.max_map_count=262144: 应该没关系
  11. 减少写入操作时的主分片分布到内存较小的节点 “index.routing.allocation.require._name”: “big_memory_node”:后续再使用
  12. 尝试调大 min_free_kbytes 和 watermark_scale_factor 让缓存尽快回收。:暂未调整,怕影响全局。

其它:

  • 像es这种,不光要占内存,写文件还要占用文件系统缓存(cache)。cache占了太大,回收不及时,触发了min水位线,就导致oom了。
    如果确实当时是cache占用过大,就不能通过调整jvm参数解决,需要调整内存(cache)回收策略,或者手工回收cache。
  • 机器当时有没有记录sar信息,可以看看当时的cache有多少。如果是centos,默认就开启了,可以看看故障时间点前后的cache使用量。
  • 尝试调大 min_free_kbytes 和 watermark_scale_factor,让缓存尽快回收。或者写个crontab,定期echo 3 > /proc/sys/vm/drop_caches
  • 可以调 watermark_scale_factor,拉大min和low的“间隔”,触发了 low 就开始异步回收了。
  • 看下 dmesg 里面有更详细日志 有当时系统内存大小,被 kill 的 java 进程的得分
  • 如果不是一直OOM,可以通过_nodes/stats/jvm这个API看看jvm使用情况, 尤其是内存飙升的时候, 这个版本的熔断功能应该是相对完善了,OMM也可能是一些操作引起的,比如比较重量级的查询,合理设置一下。

PHP 7/8 新特性

PHP 7.0 (2015)

  1. 标量类型声明:支持在函数参数和返回值中使用 intfloatstringbool
  2. 返回类型声明:函数可以显式声明返回值类型。
  3. 匿名类:支持使用 new class 创建匿名类。
  4. 太空船操作符 (<=>):简化比较逻辑。
  5. Null 合并操作符 (??):用于处理空值的默认值。
  6. 性能提升:Zend 引擎 3 实现,带来显著性能优化和更低的内存使用。
  7. 抽象语法树(AST):引入新的底层实现,为未来扩展和工具奠定基础。
  8. 致命错误转为异常:改进了错误处理,使致命错误以异常形式抛出。

PHP 7.1 (2016)

  1. 可空类型:通过前置 ? 支持参数或返回值可以为 null(如:?int)。
  2. 类常量可见性:支持 publicprotectedprivate 修饰类常量。
  3. 多异常捕获:通过 catch (Exception1 | Exception2 $e) 捕获多个异常类型。
  4. Void 返回类型:允许声明函数无返回值 (void)。
  5. 异步生成器:通过 yieldGenerator 提供更强的协程支持。
  6. 短数组解构:支持列表解构([$a, $b] = [1, 2])。

PHP 7.2 (2017)

  1. 参数类型的对象扩展:支持用 object 作为类型提示。
  2. 加密扩展更新:引入 Argon2 算法,用于更安全的密码哈希。
  3. 抽象方法覆盖:子类覆盖抽象方法时参数类型声明必须匹配。
  4. 多字节字符串增强mbstring 增强对 UTF-8 处理能力。

PHP 7.3 (2018)

  1. 灵活的 Heredoc 和 Nowdoc 语法:改进多行字符串的书写方式。
  2. 数组解构改进list() 可以用于数组的键值对。
  3. is_countable() 函数:用于判断变量是否可计数。
  4. JSON 扩展增强JSON_THROW_ON_ERROR 常量提供异常抛出方式处理错误。
  5. 引用赋值列表支持:支持 list($a, &$b) = ...

PHP 7.4 (2019)

  1. 箭头函数 (fn):简化单行匿名函数(fn($x) => $x * 2)。
  2. 类型属性:属性声明可直接带类型(如:public int $age;)。
  3. 弱引用:新增 WeakReference 类,支持更灵活的对象引用。
  4. Null 合并赋值操作符 (??=):简化赋值逻辑。
  5. 预加载(Preloading):支持在服务器启动时预加载代码,提升性能。
  6. 反射增强:更强的反射能力,例如对类属性类型的支持。
  7. 废弃动态属性:动态添加属性警告,增强代码规范性。

PHP 8.0

  1. 命名参数:调用函数时可以通过参数名指定值,顺序可变。
  2. 联合类型:使用 | 表示多个类型(e.g., int|float)。
  3. 属性(Attributes):引入原生注解功能。
  4. 匹配表达式(Match Expression):简洁替代 switch
  5. Nullsafe 操作符:安全地访问可能为 null 的对象属性或方法(?->)。
  6. JIT 编译器:显著提升运行性能。
  7. 字符串中的数值比较更严格:解决宽松类型比较的问题。

PHP 8.1

  1. 只读属性(Readonly Properties):属性声明为只读后不可修改。
  2. 枚举类型(Enums):支持定义枚举类。
  3. Fibers:为协程提供更底层的 API。
  4. 交集类型(Intersection Types):用 & 表示的多个接口类型要求。
  5. 新初始化对象语法:支持 new 连续赋值(new ClassName()->property = value)。
  6. array_is_list() 函数:判断数组是否为列表。

PHP 8.2

  1. 只读类(Readonly Classes):声明类中所有属性为只读。
  2. 动态属性弃用:不再允许类动态添加属性,除非显式声明 #[\AllowDynamicProperties]
  3. 独立类型(Standalone Types):支持 nulltrue 等独立类型。
  4. 敏感参数红acted:支持通过 #[SensitiveParameter] 防止参数在错误中暴露。
  5. 数组解包支持字符串键:改进解包功能。

PHP 8.3

  1. json_validate() 函数:验证 JSON 字符串的有效性。
  2. 更灵活的 declare():支持块级作用域。
  3. 新随机扩展 API:更安全和现代的随机数生成。
  4. 负数索引支持 str_ends_with:改进字符串操作函数。

问题:如何过滤掉一个 csv 中第 3, 4, 5 列都是 0 的行

  • 尝试 1: 用 csvgrep -c 6 -m 0 --invert-match 过滤,失败,因为只能一次只能过滤一个列,多个管道后是 and 的逻辑,不是 or 的逻辑
  • 尝试 2: 用 csvsql --query "SELECT * FROM stdin WHERE NOT (c3 = 0 AND c4 = 0 AND c5 = 0)" 过滤,失败,csvsql 会把整个 csv 灌到 sqlite 里,过程特别慢,csv 太大内存也撑不住。
  • 尝试 3: 因为 3,4,5 列正好是最后三列,而且都是数字列,用 grep -Ev ',0,0,0$' 过滤,失败,-E 不认行尾标志 $,怀疑行尾有换行,用 grep -Ev ',0,0,0\n$'grep -Ev ',0,0,0\r$' 都不行,任何行都没过滤掉
  • 尝试 4:使用 grep -Pv ',0,0,0\n$' 失败, 使用 grep -Pv ',0,0,0\r$' 终于正常了,感觉每行有个 \r 没有去掉,但 grep -P 会很慢
  • 尝试 5:先删掉 \r 再用 grep -E 既能满足需求,速度也很快:tr -d '\r' | grep -Ev ',0,0,0$'

分析:

  • 可以用 od 命令查看 csv 的行结束符:head -n1 | od -xc,或者用 cat -A 查看不可见字符
  • grep -Egrep -P 都使用 $ 表示行尾, 但如果行尾有 \rgrep -E 会失败。
  • grep -E 不支持标准转义序列(例如 \n, \r 等),要使用八进制或十六进制表示:
    • echo -e "line1\r\nline2" | grep -E $'line1[\x0D]'
    • echo -e "line1\r\nline2" | grep -E $'line1\15'
    • 引号前面的 $ 是 Bash 的特殊符号,表示 ANSI C 风格字符串,和 grep 无关。
    • 包含 -r 的文本在终端里可能无法查看,需要过滤掉才能正常显示: echo -e "line1\r\nline2" | grep -E $'line1\15' | tr -d '\r'
  • grep -P 可以用 \r?$ 来兼容 CRLF 和 LF。

关于游戏出海CPC研究
https://www.luolink.com/article/11a298ea-4c9a-8037-a45a-e58b822b8285

免费 API
https://publicapis.io/

AI 搜索
https://www.perplexity.ai/search/i-have-a-blog-about-mysql-back-lIgz9FzIQC6pcreJccwDNg


问题:如何查看 MySQL 中最近一分钟执行了多少 CRUD 操作,平均耗时有多少?

背景:有时候看到 MySQL 的 CPU 占用特别高,使用 show process list 只能看到耗时较久的查询,
但看不到那些执行很快,但次数很多的查询,如果开 general_log 的话,对数据库性能影响又很大,
所以有时候我们需要查看最近一段时间,比如 1 分钟,或 10 分钟内,select, insert, update 的
请求量大概有多少,以便初步定位引起 CPU 高的原因。

分析:

  • events_statements_summary_global_by_event_name 表中有服务器启动依赖每个 event 执行的次数和花费的时间
  • 可以创建一个临时表,记录 events 表的快照
  • 然后 sleep 1 分钟,
  • 再把最新的 events 表的数据减去快照里的值,即可计算出这段时间每种操作的条数和平均执行时间。

代码如下:

DROP TEMPORARY TABLE IF EXISTS temp_queries;
CREATE TEMPORARY TABLE temp_queries AS
SELECT NOW() AS Sample_Time, EVENT_NAME,SUM_TIMER_WAIT AS Total_Time, COUNT_STAR AS Total_Queries
FROM performance_schema.events_statements_summary_global_by_event_name
WHERE EVENT_NAME like 'statement/sql%';

SELECT SLEEP(10);

with start as(
    SELECT * FROM temp_queries
),end as (
    SELECT NOW() AS Sample_Time, EVENT_NAME,SUM_TIMER_WAIT AS Total_Time, COUNT_STAR AS Total_Queries
    FROM performance_schema.events_statements_summary_global_by_event_name
    WHERE EVENT_NAME like 'statement/sql%'
), diff as(
    select start.EVENT_NAME, TIMESTAMPDIFF(SECOND, start.Sample_Time,end.Sample_Time) AS Seconds,
    (end.Total_Queries-start.Total_Queries) Querys,
    (end.Total_Time-start.Total_Time) Total_Time_Diff
    from end join start on end.EVENT_NAME = start.EVENT_NAME
)
select Seconds, EVENT_NAME,Querys,Total_Time_Diff/1000000000000/Querys as AvgTime
from diff where Querys> 0 order by Querys desc;

查看具体的统计

select * from performance_schema.setup_consumers where name like 'events_statements%';
update performance_schema.setup_consumers set enabled = 'YES' where name ='events_statements_history_long';
UPDATE performance_schema.setup_instruments SET ENABLED = 'NO', TIMED = 'NO' WHERE NAME in ('statement/com/Quit', 'statement/sql/change_db','statement/sql/set_option','statement/com/Close stmt','statement/com/Prepare','statement/com/Execute');

select EVENT_NAME,count(*) from performance_schema.events_statements_history_long group by EVENT_NAME order by count(*) desc limit 10;
select * from performance_schema.events_statements_history_long where EVENT_NAME like 'statement/sql/%' limit 10\G
select CURRENT_SCHEMA,count(*) from performance_schema.events_statements_history_long group by CURRENT_SCHEMA;

select DIGEST_TEXT, count(*) count, ROUND(AVG(TIMER_WAIT/1000000000000),2) avg_time 
from performance_schema.events_statements_history_long 
where DIGEST_TEXT is not null 
group by DIGEST_TEXT order by count(*) desc 
limit 10\G


UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES';
update performance_schema.setup_consumers set enabled = 'NO' where name ='events_statements_history_long';

general log

SHOW VARIABLES LIKE 'general_log_file';
SET GLOBAL general_log = 'ON';
SET GLOBAL general_log = 'OFF';

cloudflare D1 database quick start
https://developers.cloudflare.com/d1/get-started/

使用 eBPF 诊断服务网格网络性能
https://skywalking.apache.org/zh/diagnose-service-mesh-network-performance-with-ebpf/

什么是波动率锥?如何用波动率锥设计期权策略?
https://zhuanlan.zhihu.com/p/74766025

使用隐含波动率和历史波动率设计这样一个交易策略:

  • 当 HV 大于波动率锥 70%分位数时,卖出跨式期权组合;
  • 当HV < 70%分位时,则比较 HV 与 IV 的大小关系,若 IV < HV,则买入跨式期权组合;
  • 若IV−HV > ε,则卖出 跨 式 期 权 组 合 。根据 IV 与 HV 的 关 系 , ε 取 max(IV 与 HV 差的均值,0)。

期权卖方盈利终极指南-第五章 深入了解波动率
https://mp.weixin.qq.com/s?__biz=MzkyMzA5MzgwNQ==&mid=2247483778&idx=1&sn=a567c72ac3cdf1ef67b89ea178124500&chksm=c1eb1fc0f69c96d6d608e366669c1bfaffe245827b9a24560315952fd6a7f7030d7b010dfbf9&cur_album_id=2782448054980640768&scene=189#wechat_redirect

期权卖方盈利终极指南-第六章 专家级的日历价差教程
https://mp.weixin.qq.com/s/ZuwCyvSQ2LKc-w2wPSSrtw


《MySQL 101》教程提纲

第1章:简介

  1. MySQL是什么
    • 数据库的定义和用途
    • MySQL的特点与优势
    • MySQL的常见应用场景
  2. MySQL架构概览
    • MySQL的客户端-服务器模型
    • 存储引擎的概念(InnoDB vs MyISAM)

第2章:安装与设置

  1. 安装MySQL
    • Windows平台安装
    • macOS平台安装
    • Linux平台安装(APT/YUM/Docker)
  2. 基本配置
    • 配置文件(my.cnf/my.ini)详解
    • 数据目录和日志文件
  3. 启动与关闭服务
    • 使用命令行管理MySQL服务
    • 常见启动参数
  4. 连接到MySQL
    • 使用命令行客户端
    • GUI工具(如MySQL Workbench)简介

第3章:基础操作

  1. 数据库与表
    • 创建数据库和表
    • 删除数据库和表
    • 查看已有数据库和表
  2. 基本数据操作
    • 插入数据(INSERT语句)
    • 查询数据(SELECT语句)
    • 更新数据(UPDATE语句)
    • 删除数据(DELETE语句)
  3. 数据类型
    • 数值类型
    • 字符串类型
    • 日期和时间类型

第4章:高级查询

  1. 查询条件
    • WHERE子句
    • 比较运算符与逻辑运算符
  2. 排序与分页
    • 使用ORDER BY排序
    • LIMIT实现分页
  3. 聚合与分组
    • COUNT, SUM, AVG等聚合函数
    • GROUP BY与HAVING的用法
  4. 多表查询
    • JOIN语法(INNER JOIN, LEFT JOIN, RIGHT JOIN)
    • 子查询的使用

第5章:索引与性能优化

  1. 索引基础
    • 索引的种类(主键索引、唯一索引、普通索引)
    • 创建与删除索引
    • 索引的作用与限制
  2. 查询优化
    • 使用EXPLAIN分析查询
    • 优化SQL语句的技巧
  3. 数据库设计优化
    • 规范化与反规范化
    • 合理设计表结构

第6章:事务与锁

  1. 事务基础
    • 什么是事务
    • 事务的ACID特性
    • 使用BEGIN, COMMIT, ROLLBACK控制事务
  2. 锁机制
    • 行锁与表锁
    • 死锁的定义与处理

第7章:存储引擎

  1. 存储引擎概览
    • InnoDB与MyISAM的对比
    • 如何选择合适的存储引擎
  2. 深入InnoDB
    • 支持事务与外键
    • 表空间与文件结构

第8章:用户与权限管理

  1. 用户管理
    • 创建与删除用户
    • 修改用户密码
  2. 权限控制
    • 权限的种类
    • 授权与撤销权限
    • 使用SHOW GRANTS查看权限

第9章:备份与恢复

  1. 备份方法
    • 逻辑备份(mysqldump)
    • 物理备份(使用Xtrabackup等工具)
  2. 数据恢复
    • 恢复单个表或整个数据库
    • 处理备份与恢复中的常见问题

第10章:高可用与集群

  1. 主从复制
    • 复制的原理与配置
    • 常见问题及解决方案
  2. 高可用架构
    • MySQL Cluster简介
    • 使用组复制(Group Replication)
    • ProxySQL与负载均衡

附录

  1. 常见问题与解决方法
  2. 实用工具推荐
    • 数据库管理工具(如phpMyAdmin、DBeaver)
    • 性能监控工具
  3. 参考文献与学习资源
    • 官方文档
    • 推荐的书籍与在线教程

https://developers.cloudflare.com/reference-architecture/diagrams/serverless/fullstack-application/


使用 redis 布隆过滤器对 csv 去重并插入 mysql

  • 使用 Python 的 csv 模块读取 CSV 文件,使用 pymysql 访问 MySQL 数据库。
  • 使用 Redis 的 RedisBloom 模块实现布隆过滤器去重。
  • 布隆过滤器提示不存在的,直接插入 mysql 。
  • 布隆过滤器提示存在,要去 mysql 里确认,确实不存在的也要插入到 mysql。
  • 每缓存 1000 行要插入的数据,批量一次插入到 mysql,提高插入效率,降低内存消耗。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import csv
import redis
import pymysql

# 连接到Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 连接到MySQL
connection = pymysql.connect(host='localhost',
user='your_username',
password='your_password',
database='your_database',
cursorclass=pymysql.cursors.DictCursor)

# 创建布隆过滤器
bloom_filter_name = 'bloom_filter'
error_rate = 0.01 # 假阳性率
initial_capacity = 10000000 # 初始容量
r.execute_command('BF.RESERVE', bloom_filter_name, error_rate, initial_capacity)

# 读取CSV文件并去重
csv_file_path = 'your_file.csv'
batch_size = 1000 # 批量插入的大小
unique_rows = [] # 存储去重后的数据

with open(csv_file_path, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
headers = next(reader) # 读取表头
column_to_check_index = headers.index('Column1') # 需要去重的列名在表头中的索引

for row in reader:
item = row[column_to_check_index]
if r.execute_command('BF.EXISTS', bloom_filter_name, item) == 0:
# 如果布隆过滤器提示元素不存在,直接添加到布隆过滤器和unique_rows中
r.execute_command('BF.ADD', bloom_filter_name, item)
unique_rows.append(row)
else:
# 如果布隆过滤器提示元素可能存在,需要在MySQL中确认
with connection.cursor() as cursor:
# 构建查询SQL语句,这里需要根据实际表结构调整
sql = "SELECT * FROM your_table WHERE Column1 = %s"
cursor.execute(sql, (item,))
result = cursor.fetchone()
if not result:
# 如果MySQL中不存在该项,则添加到布隆过滤器和unique_rows中
r.execute_command('BF.ADD', bloom_filter_name, item)
unique_rows.append(row)

# 当unique_rows累计到1000行时,批量插入MySQL
if len(unique_rows) >= batch_size:
self.insert_into_mysql(connection, unique_rows)
unique_rows = [] # 清空列表以便下一批数据

# 插入剩余的数据
if unique_rows:
self.insert_into_mysql(connection, unique_rows)

# 定义批量插入MySQL的函数
def insert_into_mysql(connection, data):
try:
with connection.cursor() as cursor:
# 构建插入SQL语句,这里需要根据实际表结构调整
sql = "INSERT INTO your_table (Column1, Column2, ...) VALUES (%s, %s, ...)"
# 请根据实际情况替换列名和值
cursor.executemany(sql, [tuple(row) for row in data])
connection.commit()
except pymysql.Error as e:
print(f"Error inserting data into MySQL: {e}")
connection.rollback()

# 关闭数据库连接
connection.close()

统计每分钟执行 url 2xx 和 5xx 的应答数

tail -n1000000 /var/log/nginx/access.log | awk '
NF>=25&&$12~/api\/user/ {
    split($1, t, ":");
    currentMinute = t[1] ":" t[2];

    if (prevMinute != "" && currentMinute != prevMinute) {
        print prevMinute, count2xx[prevMinute], count5xx[prevMinute];
        delete count2xx[prevMinute];
        delete count5xx[prevMinute];
    }

    if (!(currentMinute in count2xx)) {
        count2xx[currentMinute] = 0;
        count5xx[currentMinute] = 0;
    }
    if ($2 ~ /^2[0-9][0-9]$/) count2xx[currentMinute]++;
    if ($2 ~ /^5[0-9][0-9]$/) count5xx[currentMinute]++;

    prevMinute = currentMinute;
}
END {
    if (prevMinute != "") {
        print prevMinute, count2xx[prevMinute], count5xx[prevMinute];
    }
}'

csv 过滤某列不为空的行

csvgrep -c 2 -r '.+' data.csv

过滤某列不符合某个规则的行,两种方式

csvgrep -c 5 -r '^(?!.+)' data.csv 
csvgrep -c 5 -i -r '.+' data.csv 

过滤第 5 列为空,但第 3 列不为空的行,只打印第 1,3 列。

csvgrep -c 5 -i -r '.+' data.csv  | csvgrep -c 3 -r '.+'| csvcut -c 1,3 | csvlook

mysql 高可用方案对比

在 MySQL 的高可用 (HA) 方案中,普通主从 (Master-Slave)、MHA (MySQL High Availability)、MM (双主模式 Master-Master)、MGR (MySQL Group Replication) 都是常见的架构设计,各自有不同的优缺点。以下是详细的综合对比:


1. 普通主从 (Master-Slave)

架构特点

  • 主库负责读写,备库仅用于读请求(需要读写分离)。
  • 数据通过 Binlog 复制到从库。

优点

  • 配置简单,实施成本低。
  • 备库可用于读分担,提升查询性能。
  • 容错性好,主库故障后可以手动切换到从库。

缺点

  • 主从切换需要人工干预或额外工具,切换过程有停机时间。
  • 数据延迟:从库可能有同步延迟。
  • 不支持自动故障恢复。

适用场景

  • 适合对高可用要求不太高的小型项目。
  • 读多写少的场景,通过多从库扩展读性能。

2. MHA (MySQL High Availability)

架构特点

  • 在普通主从架构的基础上,MHA 提供自动主从切换和故障恢复。
  • 包括 MHA Manager 和 MHA Node 两部分。
  • 切换时能最大程度保证数据一致性,快速提升新主库。

优点

  • 自动化主从切换,减少人工干预。
  • 数据一致性强,主库故障后从库能快速接管。
  • 与现有的主从架构兼容性高,部署较简单。

缺点

  • 写性能仍然受限于单主架构。
  • 切换过程中仍可能丢失少量事务。
  • 依赖第三方组件,官方不再维护更新。

适用场景

  • 需要高可用的中小型系统,要求尽可能减少停机时间。
  • 不需要太复杂的高可用架构的场景。

3. MGR (MySQL Group Replication)

架构特点

  • MySQL 官方推出的多主复制技术,支持多主或单主模式。
  • 节点间基于 Paxos 协议或 Raft 协议实现强一致性。
  • 自动故障检测和恢复,支持高并发。

优点

  • 数据一致性强,内置的自动化切换和故障恢复。
  • 无需额外组件,原生支持,安全性高。
  • 多主模式下支持多点写入。
  • 自动处理网络分区问题,支持事务级复制。

缺点

  • 配置复杂,对网络和硬件要求较高。
  • 写性能受限于强一致性协议。
  • 多主模式存在冲突处理问题,适合特定场景。

适用场景

  • 要求强一致性和高可用的分布式系统。
  • 对于需要横向扩展写能力的场景,单主模式推荐。

4. MM (双主模式 Master-Master)

架构特点

  • 两个主库同时支持写入,通过双向复制同步数据。
  • 双主通常配置成一个主库读写,另一个主库仅做备份。

优点

  • 支持双主写,切换速度快。
  • 两个主库均可承载业务流量,提升容灾能力。

缺点

  • 数据一致性风险:双向复制可能引发冲突或循环更新。
  • 配置和运维复杂,对业务有一定限制。
  • 不支持高并发写入,事务冲突较难处理。

适用场景

  • 需要快速切换主库,数据写入量不高的场景。
  • 双活中心场景,但一般结合业务限制写入区域以减少冲突。

综合对比表

方案 优点 缺点 适用场景
普通主从 配置简单、读写分离、成本低 手动切换,存在延迟,无法自动恢复 适合小型项目、读多写少场景
MHA 自动切换、快速恢复、一致性好 依赖第三方组件,维护更新少 中小型高可用需求系统
MGR 原生支持,强一致性,自动恢复 配置复杂,写性能受限 分布式系统,高并发写入场景
MM 快速切换,双主冗余 数据冲突风险,运维复杂 双活容灾中心,小写入业务场景

总结建议

  1. 普通主从适合预算有限或对高可用要求不高的项目。
  2. MHA是普通主从的升级,适合追求高可用但预算有限的场景。
  3. MGR是官方推荐的现代 HA 方案,适合对数据一致性要求高的场景。
  4. MM适合少量写入,快速主备切换场景,但需要谨慎设计业务逻辑。

那些上千页的计算机书,用什么姿势看?
https://zhuanlan.zhihu.com/p/371097638

es 设置副本分片

如何给elasticsearch的某个索引设置number_of_replicas为1

PUT /<index_name>/_settings
{
  "index": {
    "number_of_replicas": 1
  }
}

在分片调整期间集群是否为yellow状态,如何查看分片调整的进度

curl -X GET "http://<elasticsearch_host>:9200/_cat/recovery?v"
curl -X GET "http://<elasticsearch_host>:9200/_cat/shards?v"
curl -X GET "http://<elasticsearch_host>:9200/_cat/shards?v"

` curl -X GET “http://<elasticsearch_host>:9200/_cluster/allocation/explain” -H ‘Content-Type: application/json’ -d’
{
“index”: “my_index”,
“shard”: 0,
“primary”: false
}

curl -X GET “http://<elasticsearch_host>:9200/_cluster/health?pretty”

_cat/recovery 如何对显示的行和列进行过滤呀

curl -X GET "http://<elasticsearch_host>:9200/_cat/recovery?v&h=index,shard,stage,bytes_recovered,percent"
curl -X GET "https://<ip>/_cat/recovery?v&h=index,shard,stage,bytes_recovered,bytes_percent,source_host,target_host" | awk '$3=="index"||$3=="stage"'

You want to benchmark Elasticsearch? Then Rally is for you. It can help you with the following tasks:

  • Setup and teardown of an Elasticsearch cluster for benchmarking
  • Management of benchmark data and specifications even across Elasticsearch versions
  • Running benchmarks and recording results
  • Finding performance problems by attaching so-called telemetry devices
  • Comparing performance results

https://esrally.readthedocs.io/en/stable/index.html

ES 部分查询

我的一个ES索引,有3个node,这个索引有10个主分片,0个副本分片,当一个node挂掉后,如何仍然提供服务,只是搜索结果里少了挂掉node上的数据。

在 Elasticsearch 中,当一个节点挂掉时,如果索引的主分片分布在该节点上,且没有副本分片,默认情况下集群无法继续服务,因为 Elasticsearch 设计为提供一致性保障,不会返回部分数据。然而,如果明确需要支持部分数据查询,可以通过调整一些配置来实现,即允许部分分片返回结果。

以下是具体步骤和方法:


步骤 1: 启用 partial 查询结果

Elasticsearch 默认要求所有查询涉及的分片都必须可用。要允许返回部分分片的结果,可以在查询时设置 allow_partial_search_results 参数为 true

具体操作:

在查询时添加 allow_partial_search_results 参数,例如:

1
2
3
4
5
6
7
GET /your_index/_search
{
"allow_partial_search_results": true,
"query": {
"match_all": {}
}
}

这样,即使某些分片不可用,Elasticsearch 仍会返回其他可用分片的结果。

全局设置(可选):

如果需要所有查询默认支持部分结果,可以修改集群的全局设置:

1
2
3
4
5
6
PUT /_cluster/settings
{
"persistent": {
"search.allow_partial_results": true
}
}

步骤 2: 检查分片分布

当一个节点挂掉时,部分主分片可能会丢失。为了最大程度减少丢失的数据,可以均衡分片分布。

查看分片分布:

使用 _cat/shards 查看分片在哪些节点上:

1
curl -X GET "http://<elasticsearch_host>:9200/_cat/shards?v"

输出示例:

1
2
3
4
5
index    shard prirep state       docs   store ip         node
my_index 0 p STARTED 10000 50mb 192.168.1.1 node-1
my_index 1 p STARTED 20000 100mb 192.168.1.2 node-2
my_index 2 p STARTED 15000 75mb 192.168.1.3 node-3
...

确保主分片分布在不同节点上,以降低单点故障的影响。


步骤 3: 调整查询逻辑

在应用层处理查询结果时,确保对缺失的数据提供容错逻辑。

  1. 识别缺失的分片
    • 使用 _cat/shards_cluster/health 检查哪些分片未分配。
  2. 提示用户部分数据丢失
    • 在搜索结果中提示用户当前的查询结果可能不完整。

局限性和注意事项

  1. 数据一致性

    • 如果允许部分搜索结果,用户可能会误以为结果是完整的,因此需要在应用层提供明确的提示。
  2. 数据丢失

    • 挂掉的节点上的数据在没有副本分片时无法恢复。如果节点无法重启,数据会永久丢失。
  3. 提高容错能力

    • 为了避免这种情况,建议至少设置 number_of_replicas: 1,即使一个节点挂掉,副本分片也可以保证数据的完整性。

总结

要在 number_of_replicas=0 的情况下实现节点挂掉后仍然部分服务:

  1. 启用 allow_partial_search_results(全局或查询级别)。
  2. 确保分片尽量均匀分布在多个节点上。
  3. 在应用层处理查询结果的容错逻辑,并提示用户数据可能不完整。

设计一套高性能、高可扩展、高可用、高可伸缩的微服务架构,用于处理用户登录请求,需要从多个层面综合考虑,包括服务分层、基础设施、中间件选择、非功能需求等。

1. 总体架构设计

采用基于微服务的架构,将系统分解为若干独立的服务,登录请求由专门的认证与授权服务处理。其他基础服务(如数据库、缓存、消息队列等)和中间件提供支撑。

服务划分

  1. 用户服务
    • 处理用户信息的管理(如注册、查询、更新)。
  2. 认证服务
    • 专注于用户登录逻辑,包括身份验证(例如,用户名/密码验证,OAuth、OTP等)。
    • 支持多种认证方式。
  3. 授权服务
    • 生成和验证 JWT 或其他访问令牌,处理用户的访问权限。
  4. 审计和监控服务
    • 记录登录行为和安全事件,用于审计和安全分析。
  5. 网关服务
    • 作为 API 的入口,统一路由和转发请求。
  6. 配置管理服务
    • 统一管理微服务配置,支持动态调整。
  7. 日志服务
    • 收集和分析服务日志,便于排查问题。
  8. 负载均衡服务
    • 提供高效的负载均衡,分发请求到多个实例。

2. 技术选型

基础设施

  • 容器化:使用 Docker 容器化服务,提供一致的部署环境。
  • 容器编排:使用 Kubernetes(K8s)管理容器化服务,支持高可用和动态伸缩。

中间件

  1. API 网关
    • 工具:Kong, NGINX, Spring Cloud Gateway。
    • 功能:路由、负载均衡、限流、认证校验、跨域支持。
  2. 负载均衡
    • 工具:K8s Ingress, HAProxy, Envoy。
    • 功能:分发请求到不同节点。
  3. 缓存
    • 工具:Redis。
    • 功能:存储用户会话数据、验证码状态等,减少数据库访问压力。
  4. 数据库
    • 工具:PostgreSQL(事务性强),或 MySQL。
    • 数据分片:使用读写分离(主从架构)提高性能。
  5. 消息队列
    • 工具:RabbitMQ, Kafka。
    • 功能:解耦服务,用于异步任务处理(如登录日志记录、安全报警)。
  6. 分布式配置管理
    • 工具:Consul, Apache Zookeeper, Spring Cloud Config。
    • 功能:动态管理微服务配置。
  7. 身份认证和加密
    • 工具:Keycloak 或自定义身份认证服务。
    • 加密:使用 HMAC-SHA256 或 bcrypt 保护用户密码。

安全

  • 数据传输加密:通过 HTTPS 确保数据传输安全。
  • 用户密码加密:使用强散列算法(如 bcrypt)存储用户密码。
  • 多因子认证(MFA):支持 OTP、短信验证码等。
  • 防止暴力攻击
    • 基于 IP 的限流(如每分钟最多尝试登录 5 次)。
    • CAPTCHA 验证。

3. 详细设计

登录请求处理流程

  1. 用户请求登录

    • 用户通过前端页面提交用户名和密码。
    • 请求通过 API 网关转发到认证服务。
  2. 认证服务验证身份

    • 校验请求参数完整性。
    • 查询用户信息(从数据库或缓存)。
    • 验证密码是否正确(bcrypt 或 PBKDF2 散列)。
    • 检查用户状态(如是否被锁定、是否需要验证码验证)。
  3. 生成令牌

    • 认证通过后,生成 JWT 令牌(包含用户 ID、角色、过期时间等)。
    • 将登录状态写入 Redis 或数据库以便后续校验。
  4. 返回响应

    • 将令牌返回给前端,前端将其存储(如存储在 Cookie 或 Local Storage 中)。
  5. 日志与监控

    • 审计服务记录登录请求和结果。
    • 异常请求通过消息队列发送至安全服务进行分析。

高可用和高性能实现

水平伸缩

  • 服务容器化后,通过 Kubernetes 动态扩容服务实例。
  • 通过 HPA(Horizontal Pod Autoscaler)根据负载自动调整实例数量。

高可用

  • 数据库主从架构:主库写操作,从库读操作,提升读写性能。
  • 使用 Redis 集群:确保缓存系统的高可用性。
  • 消息队列镜像模式:RabbitMQ 的镜像队列模式保证消息的高可用性。

高性能

  • 请求限流
    • 通过 API 网关限流(如每秒最大请求数)。
  • 缓存查询
    • 用户登录状态和常用信息缓存到 Redis,减少数据库查询。
  • 异步处理
    • 审计、日志和安全报警等非关键任务通过消息队列异步处理。

4. 非功能需求

  1. 性能

    • 高并发支持:每秒支持上万次请求(通过水平扩展实现)。
    • 延迟低于 100ms:优化数据库访问、缓存查询等关键路径。
  2. 可用性

    • 服务可用性:99.99%(通过负载均衡、冗余和健康检查实现)。
    • 无单点故障:使用多副本和高可用架构设计。
  3. 安全性

    • 数据传输全加密(HTTPS)。
    • 多层防护:防止 SQL 注入、跨站脚本攻击(XSS)、跨站请求伪造(CSRF)。
    • 合规性:符合 GDPR 等隐私法规。
  4. 可伸缩性

    • 支持按需扩展:高峰期动态扩容,低峰期自动缩容。
    • 配置动态更新:通过配置管理服务无缝调整服务参数。

总结

这套微服务架构通过模块化的设计和云原生技术,能够满足高性能、高可扩展、高可用、高可伸缩的要求。关键在于服务划分清晰、基础中间件可靠,并辅以完善的安全和监控机制,确保系统在高并发场景下的稳定运行。

spark join 两个大文件

如何用spark找出两个csv文件种第2列相同的行,假设两个文件都很大,而且未排序

在 Spark 中处理两个未排序的大型 CSV 文件,并找出它们第 2 列相同的行,可以通过以下步骤实现:


步骤 1:加载 CSV 文件

Spark 提供了内置的 CSV 读取功能,可以使用 spark.read.csv 加载 CSV 文件。

1
2
3
4
5
6
7
8
9
10
from pyspark.sql import SparkSession

# 初始化 SparkSession
spark = SparkSession.builder \
.appName("Find Matching Rows in CSV") \
.getOrCreate()

# 加载 CSV 文件
file1 = spark.read.csv("file1.csv", header=True, inferSchema=True)
file2 = spark.read.csv("file2.csv", header=True, inferSchema=True)
  • header=True 表示 CSV 文件包含表头。
  • inferSchema=True 自动推断数据类型。

步骤 2:选择第 2 列

我们可以使用 DataFrame 的 select 方法选择文件中的第 2 列,并将其重命名为一个通用的列名。

1
2
3
# 选择第 2 列并重命名
file1_col2 = file1.select(file1.columns[1].alias("col2_file1"))
file2_col2 = file2.select(file2.columns[1].alias("col2_file2"))

步骤 3:寻找交集

使用 Spark 的 join 操作,以第 2 列为连接键进行内连接(inner join)。假设两个文件中第 2 列的值类型一致。

1
2
# 执行内连接,寻找交集
matching_rows = file1.join(file2, file1.columns[1] == file2.columns[1], "inner")
  • file1.columns[1]file2.columns[1] 分别表示第 2 列。
  • "inner" 表示内连接,只保留匹配的行。

步骤 4:输出结果

你可以将结果输出到文件或者在控制台中显示。

1
2
3
4
5
# 显示结果
matching_rows.show()

# 保存到文件
matching_rows.write.csv("output.csv", header=True)

优化建议

  1. 分布式计算

    • Spark 会自动将任务分布在集群中运行。确保两个文件被分区存储(例如存储在 HDFS 或 S3 中),以提高并行计算性能。
  2. 广播变量(适合一个文件较小的情况):
    如果其中一个文件较小(可以放入内存),可以将较小的文件广播到每个节点,使用 broadcast 优化连接操作。

    1
    2
    3
    4
    from pyspark.sql.functions import broadcast

    # 广播较小的文件
    matching_rows = file1.join(broadcast(file2), file1.columns[1] == file2.columns[1], "inner")
  3. 分区优化

    • 在读取 CSV 时设置合适的分区数。
    • 使用 repartitioncoalesce 调整分区数以优化性能。
    1
    file1 = file1.repartition(100)  # 将文件重新分区
  4. 列裁剪

    • 只读取和处理需要的列,以减少内存消耗。
    • 使用 select 筛选所需的列。
  5. 数据类型优化

    • 确保第 2 列的类型一致,例如都为 StringTypeIntegerType

完整代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

# 初始化 SparkSession
spark = SparkSession.builder \
.appName("Find Matching Rows in CSV") \
.getOrCreate()

# 加载 CSV 文件
file1 = spark.read.csv("file1.csv", header=True, inferSchema=True)
file2 = spark.read.csv("file2.csv", header=True, inferSchema=True)

# 执行内连接
matching_rows = file1.join(file2, file1.columns[1] == file2.columns[1], "inner")

# 显示结果
matching_rows.show()

# 保存结果到文件
matching_rows.write.csv("output.csv", header=True)

# 停止 SparkSession
spark.stop()

总结

  • Spark 提供了强大的并行计算能力,可以高效处理两个大型 CSV 文件。
  • 使用 join 操作找出第 2 列相同的行,并注意对数据类型和分区的优化。
  • 如果文件分布在分布式存储中(如 HDFS 或 S3),可以进一步利用数据局部性提升性能。

join之前先对两个文件排序是否会增加join的速度

在分布式计算中,是否对两个文件排序能加速 join 操作,取决于数据的特点和具体实现。以下是详细分析:

1. 排序是否有助于加速 join

排序本身并不总是必要,但在某些情况下,可以提高性能:

  • 无排序情况下的 join

    • Spark 默认会为 join 自动进行 shuffle,将参与 join 的列进行分区(基于键的哈希值)。
    • Shuffle 是一个昂贵的操作,涉及网络传输、磁盘 I/O 等。
  • 预排序情况下的 join

    • 如果数据已按 join 键排序,Spark 可以在某些特定情况下避免部分 shuffle 操作(例如通过 Sort-Merge Join 优化)。
    • 这种优化通常适用于 已分区且排序 的数据。

结论
如果输入文件本身未分区,也未排序,对文件排序可能不会直接提升 join 性能,反而增加预处理的时间。如果文件分区已按 join 键对齐,排序后可以避免重复 shuffle,从而加速 join


2. Spark join 的实现类型

Spark 支持多种 join 策略,是否排序影响它选择的策略:

  1. Shuffle Hash Join(默认):

    • 没有预排序或分区信息时,Spark 会对两侧数据进行 shuffle,并构建哈希表。
    • 无需排序,但 shuffle 成本高。
  2. Broadcast Hash Join

    • 当一侧数据较小时,Spark 会将小表广播到每个节点,避免 shuffle。
    • 无需排序,直接使用内存中的哈希表。
  3. Sort-Merge Join(适用于已排序数据):

    • 如果两侧数据均已按 join 键排序并分区,则可以直接进行归并,避免 shuffle。
    • 排序后的数据可以触发此优化,但排序的成本需综合考虑。
  4. Cartesian Join(笛卡尔积,极少使用):

    • 对所有组合进行匹配,效率低下。
    • 排序不会影响此类型。

3. 什么时候排序有意义?

排序可能有意义的情况:

  • 数据量特别大,且可以多次复用
    如果文件被频繁用于按相同键的 join 操作,对数据进行排序和分区可以减少后续 join 的 shuffle 成本。

  • 数据来源支持排序和分区
    如果数据来自分布式存储(如 HDFS),并支持按 join 键提前分区和排序,能避免 Spark 在运行时的 shuffle 操作。

  • 大规模关联操作
    如果两个文件非常大,且系统内存有限,提前排序和分区可以使 join 更高效。


4. 排序与分区结合的优化

对数据进行排序和分区时,应综合以下操作:

  1. 分区优化

    • 使用 repartition()join 键分区:
      1
      2
      file1 = file1.repartition("key_column")
      file2 = file2.repartition("key_column")
  2. 排序优化

    • 使用 sortWithinPartitions() 对每个分区内数据排序:
      1
      2
      file1 = file1.sortWithinPartitions("key_column")
      file2 = file2.sortWithinPartitions("key_column")
  3. 确保数据分区对齐

    • 如果两个文件的数据分区数量不一致,会引发额外的 shuffle。
    • 使用 coalesce()repartition() 对齐分区数。

5. 总结与建议

  • 排序前的权衡:
    排序会增加预处理的计算时间。如果 join 是一次性任务,不建议先排序,因为 Spark 会自动选择高效的 join 策略(如 Broadcast Join 或 Shuffle Join)。

  • 排序后的收益:
    如果文件被频繁使用,提前按 join 键分区和排序能显著减少后续 shuffle 成本,提高多次 join 的性能。

  • 推荐实践:

    • 小表广播优化:如果其中一个文件较小(如小于几百 MB),优先选择 Broadcast Join。
    • 分区对齐:对于大表,确保分区按 join 键一致。
    • 数据复用:在需要多次 join 时,排序分区后缓存(persistcheckpoint)可降低总成本。

示例代码(分区和排序结合)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from pyspark.sql import SparkSession

# 初始化 SparkSession
spark = SparkSession.builder.appName("Optimized Join").getOrCreate()

# 加载 CSV 文件
file1 = spark.read.csv("file1.csv", header=True, inferSchema=True)
file2 = spark.read.csv("file2.csv", header=True, inferSchema=True)

# 分区并排序
file1 = file1.repartition("key_column").sortWithinPartitions("key_column")
file2 = file2.repartition("key_column").sortWithinPartitions("key_column")

# 执行 Sort-Merge Join
result = file1.join(file2, file1["key_column"] == file2["key_column"], "inner")

# 显示结果
result.show()

# 停止 SparkSession
spark.stop()

解释这行命令 ./bin/spark-submit --master local[4] SimpleApp.py

以下是对这行命令的详细解释:

1
./bin/spark-submit --master local[4] SimpleApp.py

1. 命令部分解析

(1) ./bin/spark-submit

  • 这是 Spark 提供的脚本,用于提交 Spark 应用程序以运行。
  • 它负责启动 Spark 集群环境(或本地环境),配置应用程序,加载依赖项,并运行指定的代码。

(2) --master local[4]

  • --master 指定 Spark 应用程序运行的部署模式,也就是 Spark 集群的主控节点地址

    • local[4] 表示在本地运行应用程序,并使用 4 个线程。

      • 本地模式是 Spark 的一种运行模式,适合开发、调试或小规模实验。
      • [4] 指定本地模式中使用的线程数量,通常对应于 CPU 核心数。
      • 例如,如果机器有 4 个 CPU 核心,local[4] 可以充分利用 CPU 资源。
    • 其他常见的选项:

      • local: 单线程运行(适合简单调试)。
      • local[*]: 使用机器上所有的 CPU 核心。
      • yarn: 提交到 YARN 集群运行。
      • spark://<master-url>: 提交到指定的 Spark 集群。
      • k8s://<k8s-master-url>: 提交到 Kubernetes 集群。

(3) SimpleApp.py

  • 这是用户编写的 Spark 应用程序脚本,使用 Python 编写。
  • 文件名 SimpleApp.py 只是一个示例,表示包含 Spark 程序逻辑的 Python 文件。

2. 综合含义

这行命令的完整含义是:

  • 使用 spark-submit 提交名为 SimpleApp.py 的 Spark 应用程序脚本。
  • 应用程序将在本地运行,并使用 4 个线程
  • 适合在开发环境中运行 Spark 程序,充分利用本地机器的资源。

3. 应用场景

本地模式优点

  • 无需搭建 Spark 集群。
  • 快速调试和开发。
  • 可以在单机上处理小规模数据。

局限性

  • 仅能使用本地机器资源,不适合处理大规模数据。
  • 无法测试分布式功能(如数据分区跨节点处理、集群级别的故障恢复等)。

4. 扩展

  • 如何指定更多参数:

    • 例如,设置应用名称和配置文件:
      1
      ./bin/spark-submit --master local[4] --name "MyApp" --conf spark.executor.memory=2g SimpleApp.py
  • 将应用提交到集群:

    • 提交到 Spark 独立集群:

      1
      ./bin/spark-submit --master spark://master-host:7077 SimpleApp.py
    • 提交到 YARN 集群:

      1
      ./bin/spark-submit --master yarn --deploy-mode cluster SimpleApp.py
    • 提交到 Kubernetes:

      1
      ./bin/spark-submit --master k8s://https://<k8s-master-url> --deploy-mode cluster SimpleApp.py

查看 java 进程

ps -ef | grep java | cut -c 1-100
lsof -p 3217885 2>/dev/null | awk '$4=="cwd"'

union和orderBy是否会占用太多内存,需要人工设置分区个数吗

在 PySpark 中,操作如 unionorderBy 确实会消耗较多内存和计算资源,尤其是在处理大数据集时。以下是对它们的性能和内存消耗的分析,以及是否需要设置分区数量的建议。


1. union 的内存使用分析

如何工作

  • union 会将两个或多个 DataFrame 合并成一个。合并时不进行去重,只是简单地拼接数据。
  • 默认情况下,union 操作不会重新分区,因此分区数量与输入的第一个 DataFrame 保持一致。

性能与内存消耗

  • 对于大数据集,union 操作会占用内存和磁盘 I/O。
  • 如果多个 DataFrame 的分区数差异较大,可能导致任务负载不均衡。

优化建议

  1. 设置合适的分区数量

    • 在合并前,确保所有 DataFrame 的分区数接近一致。
    • 如果数据量很大,可以通过 repartition(n) 显式调整分区数量。例如:
      1
      2
      3
      df1 = df1.repartition(100)  # 调整分区数为 100
      df2 = df2.repartition(100)
      merged_df = df1.union(df2)
  2. 减少重复分区调整

    • 如果后续操作如 orderBy 会自动重新分区,可以避免在 union 前重复调整分区。

2. orderBy 的内存使用分析

如何工作

  • orderBy 是一个宽依赖操作,需要将数据根据排序键进行全局排序。
  • 这通常需要对整个数据集进行 Shuffle,即将数据重新分区并排序。

性能与内存消耗

  • orderBy 是一个高成本操作,因为它需要:
    • 重新分区所有数据。
    • 将数据按照排序键进行全局排序。
  • 对大数据集,这种操作可能导致内存不足或性能瓶颈,尤其是在分区数不足的情况下。

优化建议

  1. 调整分区数量

    • orderBy 会触发 Shuffle 操作,增加分区可以减少单个分区的数据量,降低内存压力。
    • 例如:
      1
      sorted_df = merged_df.repartition(200).orderBy("domain")
  2. 使用 sortWithinPartitions

    • 如果不需要全局排序,可以使用 sortWithinPartitions 在每个分区内排序,避免全局 Shuffle。
      1
      sorted_df = merged_df.sortWithinPartitions("domain")
  3. 减少数据规模

    • 如果只是采样部分数据,可以使用 limit()sample() 方法减少排序数据量。

3. 是否需要手动设置分区数

  • 分区数的默认行为

    • 默认分区数为 spark.sql.shuffle.partitions,通常是 200。如果数据量很大或资源有限,可以手动调整。
  • 如何判断是否需要调整

    1. 监控任务的分区情况
      • 使用 Spark Web UI 查看分区大小和任务分布。如果某些分区数据量过大,可能需要增加分区。
    2. 查看运行内存和执行时间
      • 如果任务因为内存不足或 Shuffle 时间过长失败,可能需要增加分区。
  • 调整分区数的推荐值

    • 分区数通常与数据大小、集群资源和任务类型有关。一般经验:
      • 每个分区处理 128MB 至 256MB 的数据。
      • 分区数量 = 数据大小 / 每分区数据量。

group by

df.groupBy(“domain”) 会利用上文件已经按domian排序这个事实吗

在 PySpark 中,groupBy 操作默认情况下并不会直接利用文件已经按 domain 排序的事实。这是因为:

  1. groupBy 的机制

    • groupBy 是一个宽依赖操作,它需要所有相同键(如 domain)的数据集中到相同的分区中进行分组。这通常会触发 Shuffle 操作,将数据重新分布到各个分区。
  2. 已排序文件的优势未被使用

    • 即使文件已经按 domain 排序,PySpark 不会在默认情况下将排序信息传递给 groupBy,因此仍然会触发 Shuffle。
    • PySpark 无法推断输入文件的排序信息,除非我们明确告知。

如何利用已排序文件的事实?

如果输入文件已经按 domain 排序,可以通过以下方法优化分组操作:


1. 使用 sortWithinPartitions

在数据读取后,通过 sortWithinPartitions 保持每个分区内的排序,然后按键分组。因为每个分区内的键是有序的,分组的开销会较小。

1
2
3
4
5
# 保持分区内的排序
sorted_df = df.sortWithinPartitions("domain")

# 按 domain 分组统计
domain_counts = sorted_df.groupBy("domain").agg(count("domain").alias("count"))

优点

  • 保留分区内排序信息,减少分组过程中的局部 Shuffle。

2. 使用 RDD 并通过文件排序优化

如果完全控制输入文件的排序和分区,可以使用 RDD 的 combineByKey,避免全局 Shuffle。以下是使用 RDD 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 将 DataFrame 转换为 RDD
rdd = df.rdd.map(lambda row: (row["domain"], 1))

# 按 domain 聚合,利用已排序的特点
domain_counts_rdd = (
rdd.combineByKey(
lambda value: value, # 创建初始值
lambda acc, value: acc + value, # 聚合值
lambda acc1, acc2: acc1 + acc2 # 合并分区
)
)

# 转回 DataFrame
domain_counts = domain_counts_rdd.toDF(["domain", "count"])

优点

  • 如果输入文件按 domain 排序且分区正确,可以完全避免 Shuffle。

3. 确保文件按分区排序

如果可以控制数据的写入方式,建议在文件生成时:

  1. domain 分区并排序。
  2. 使用 repartitionByRange 写入文件,例如:
1
2
3
# 写入排序后的文件,按 domain 分区
sorted_df = df.repartitionByRange(10, "domain").sortWithinPartitions("domain")
sorted_df.write.csv("sorted_output.csv", header=True)

读取时直接利用分区结构,可以显著减少 Shuffle。


优化建议总结

  • 小数据集
    默认的 groupBy 是简洁的选择,不需要额外优化。

  • 大数据集(已按键排序)

    • 使用 sortWithinPartitions 或 RDD 操作减少 Shuffle。
    • 确保数据按键分区并保持排序。
  • 大数据集(未按键排序)

    • 使用 repartitionByRange 提前对数据分区。
    • 使用合适的分区数(repartitioncoalesce)。

详细解释combineByKey的第二种方法

combineByKey 是 RDD 的一种高效聚合方法,适用于分布式数据处理场景,特别是当你需要按照键对数据进行聚合时。

以下是对 combineByKey 的第二种方法的详细解释,以及它如何工作。


combineByKey 的机制

combineByKey 需要三个函数来完成分区内和分区间的聚合操作:

  1. 创建初始值(createCombiner

    • 当某个键第一次出现时,如何初始化其值。
    • 比如,对于 (key, value),你可以将 value 包装为一个更复杂的数据结构(如计数器、列表等)。
  2. 分区内合并(mergeValue

    • 当相同分区中的某个键再次出现时,如何将新值与当前累积值合并。
  3. 分区间合并(mergeCombiners

    • 当不同分区中累积的结果需要合并时,如何将它们合并。

应用场景

假设我们有以下数据集,统计每个 domain 的出现次数:

1
[("gmail.com", 1), ("yahoo.com", 1), ("gmail.com", 1), ("outlook.com", 1)]

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pyspark import SparkContext

sc = SparkContext("local", "combineByKey Example")

# 示例数据
rdd = sc.parallelize([
("gmail.com", 1),
("yahoo.com", 1),
("gmail.com", 1),
("outlook.com", 1),
("gmail.com", 1)
])

# 使用 combineByKey 聚合
result = rdd.combineByKey(
lambda value: value, # createCombiner
lambda acc, value: acc + value, # mergeValue
lambda acc1, acc2: acc1 + acc2 # mergeCombiners
)

# 将结果收集并打印
print(result.collect())

三个函数的详细解释

1. createCombiner

  • 描述:当某个键第一次被遇到时,createCombiner 会生成该键的初始值。
  • 示例:将值初始化为自身,比如将 ("gmail.com", 1)1 保持为初始值。

代码中对应:

1
lambda value: value

如果键是 "gmail.com" 且值是 1,那么 combineByKey 内部会为 "gmail.com" 创建一个累积器,初始值为 1


2. mergeValue

  • 描述:定义如何将相同分区内的新值与当前累积值合并。
  • 示例:累加新值到已有值,比如 1 + 1

代码中对应:

1
lambda acc, value: acc + value

如果当前分区的键 "gmail.com" 已有累积值 2,并遇到新值 1,那么累积器会更新为 3


3. mergeCombiners

  • 描述:定义如何将不同分区中累积的结果合并。
  • 示例:累加来自不同分区的累积值,比如 3 + 2

代码中对应:

1
lambda acc1, acc2: acc1 + acc2

如果两个分区的键 "gmail.com" 的累积值分别是 32,那么合并后结果是 5


如何工作(流程解释)

  1. 初始阶段

    • 对于每个分区内的键值对,第一次遇到某个键时,调用 createCombiner 初始化该键的累积值。
    • 例如:
      1
      ("gmail.com", 1) -> 初始值为 1
  2. 分区内聚合

    • 每个分区独立处理自己的数据,对于相同的键,使用 mergeValue 累加。
    • 示例(分区1的数据):
      1
      ("gmail.com", 1), ("gmail.com", 1) -> 初始值 1 -> 累加值为 2
  3. 分区间聚合

    • 各分区处理完后,Spark 使用 mergeCombiners 将不同分区的结果合并。
    • 示例:
      • 分区1:("gmail.com", 2)
      • 分区2:("gmail.com", 1)
      • 合并结果:("gmail.com", 3)

输出示例

对于以下输入数据:

1
[("gmail.com", 1), ("yahoo.com", 1), ("gmail.com", 1), ("outlook.com", 1), ("gmail.com", 1)]

代码会输出:

1
[('yahoo.com', 1), ('gmail.com', 3), ('outlook.com', 1)]

性能优势

combineByKey 的核心优势在于:

  1. 分区内高效聚合

    • 通过 mergeValue 在分区内提前聚合,减少数据量。
  2. 分区间高效合并

    • 在分区间只需传输聚合结果,而不是所有数据。

这使得 combineByKey 非常适合键值对数据的大规模聚合。


改进和扩展

  • 优化内存使用
    如果键值对的结果较大,可以在 createCombinermergeValue 中使用高效数据结构(如计数器或集合)。

  • 统计复杂信息
    可以使用 combineByKey 计算平均值、最大值等。例如:

    1
    2
    3
    4
    5
    rdd.combineByKey(
    lambda value: (value, 1), # 初始值 (总和, 个数)
    lambda acc, value: (acc[0] + value, acc[1] + 1), # 分区内累加
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) # 分区间合并
    ).mapValues(lambda x: x[0] / x[1]) # 计算平均值

这种方式可以灵活地计算更复杂的统计信息。


总结

  • combineByKey 是一种分布式键值对聚合操作,通过三步(创建初始值、分区内合并、分区间合并)高效处理大规模数据。
  • 当数据已经按键排序时,可以进一步优化性能,减少 Shuffle 操作。
  • 它非常适合复杂聚合任务,如计算计数、平均值等。

一个csv有两列,一个性别列,一个年龄列,用spark写一段代码按性别分组,然后统计每个分组中每个年龄段的人数,年龄段每10岁划分为一个段。

计算年龄段的代码使用udf,原始数据已经按照性别排序,有可能性别有数据倾斜,数据大约10亿,服务器内存较少,给出完整的代码。

考虑到你的数据量非常大(约10亿条数据),而且服务器内存较少,以下是使用 Spark 进行高效处理的完整代码,结合了以下优化策略:

  1. 使用 UDF 来计算年龄段
  2. 避免全局 Shuffle,尽量减少 Spark 操作的开销
  3. 根据性别分区(避免性别数据倾斜),减少不必要的 shuffle 操作。
  4. 使用合适的内存管理和分区策略,保证 Spark 程序的性能。

优化思路

  • UDF计算:通过定义自定义 UDF 来计算每个数据的年龄段。
  • 按性别分区:使用 repartitionByRange 或者 partitionBy 按性别分区,可以减少后续的 Shuffle 操作,防止性别列的数据倾斜。
  • 缓存中间数据:如果需要多次计算,可以缓存中间结果,避免重复计算。
  • 调整分区数:如果数据量非常大,可以调整 spark.sql.shuffle.partitions 配置来减少 shuffle 的开销。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, concat, lit, floor
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

# 初始化 SparkSession
spark = SparkSession.builder \
.appName("AgeGroupByGenderWithUDF") \
.config("spark.sql.shuffle.partitions", "200") \ # 根据实际情况调整
.config("spark.executor.memory", "4g") \ # 根据内存情况调整
.config("spark.executor.cores", "4") \ # 根据可用的 CPU 核心数调整
.getOrCreate()

# 示例数据路径(假设是 CSV 文件,实际情况中应该读取你的大数据源)
# 数据示例:每行包括性别和年龄
input_file = "path_to_your_data.csv"

# 读取数据,假设 CSV 有两列: gender 和 age
df = spark.read.option("header", "true").csv(input_file).select("gender", "age")

# 将年龄列转为整型
df = df.withColumn("age", df["age"].cast("int"))

# 定义 UDF 来计算年龄段
def age_group_logic(age):
if age is None:
return None
# 计算年龄段,例如: 20-29, 30-39
start_age = (age // 10) * 10
end_age = start_age + 9
return f"{start_age}-{end_age}"

# 注册 UDF
age_group_udf = udf(age_group_logic, StringType())

# 添加年龄段列
df_with_age_group = df.withColumn("age_group", age_group_udf(col("age")))

# 按性别分区(避免数据倾斜),通过 repartitionByRange 按性别排序
df_partitioned = df_with_age_group.repartitionByRange(200, "gender")

# 缓存中间结果,避免重复计算
df_partitioned.cache()

# 按性别和年龄段分组并统计人数
result = df_partitioned.groupBy("gender", "age_group").agg(F.count("*").alias("count"))

# 排序以方便查看
result = result.orderBy("gender", "age_group")

# 显示结果
result.show(truncate=False)

# 保存结果(可以根据需求选择保存格式)
# result.write.csv("output_path", header=True)

代码解释:

  1. SparkSession:通过 SparkSession.builder 设置了适当的配置,主要配置了:

    • spark.sql.shuffle.partitions: 调整 shuffle 操作的分区数。此参数需要根据实际数据量和集群大小进行调整。默认值通常为200,可以适当调整。
    • spark.executor.memory: 根据服务器内存大小调整。如果内存不足,可以适当减少。
    • spark.executor.cores: 设置每个 executor 的 CPU 核心数,防止过多的并行任务导致资源不足。
  2. 读取数据:从 CSV 文件中读取数据,并选择性地保留 genderage 列。如果你的数据存储格式不同,可以调整读取方法。

  3. 年龄段计算:定义了一个 age_group_logic 函数,并通过 udf 注册它。UDF 将每个年龄值转换为对应的年龄段,例如 20-2930-39

  4. 数据分区:通过 repartitionByRange(200, "gender")gender 列进行数据分区,尽量减少后续分组操作时的数据倾斜。

  5. 缓存数据df_partitioned.cache() 缓存了中间结果,如果后续需要多次使用 df_partitioned,避免重复计算。

  6. 分组和统计:使用 groupBy("gender", "age_group").agg(F.count("*").alias("count")) 按性别和年龄段进行分组统计人数。

  7. 排序输出:通过 orderBy("gender", "age_group") 排序,方便查看输出结果。


性能考虑:

  1. 避免全局 Shuffle:我们尽量减少了不必要的 groupBy 操作,通过 repartitionByRange 来避免数据倾斜和不必要的 Shuffle。
  2. 分区数量调整spark.sql.shuffle.partitions 的调整能够有效控制 Spark 执行期间的 Shuffle 操作数量,避免大规模数据处理时过多的 Shuffle 操作带来性能瓶颈。
  3. 内存与计算资源配置:根据实际内存和 CPU 核心数来合理配置 spark.executor.memoryspark.executor.cores,避免过多的并行计算导致资源过度消耗。
  4. 缓存中间结果cache() 用来缓存已经处理过的数据,减少后续计算时的开销。

结果展示:

运行这段代码时,result.show(truncate=False) 会展示按性别和年龄段分组统计的结果。例如:

1
2
3
4
5
6
7
8
9
10
+------+---------+-----+
|gender|age_group|count|
+------+---------+-----+
|Female|10-19 |200000|
|Female|20-29 |350000|
|Female|30-39 |500000|
|Male |10-19 |150000|
|Male |20-29 |400000|
|Male |30-39 |700000|
+------+---------+-----+

根据实际需求,可以选择将结果保存到文件中,使用 result.write.csv("output_path", header=True) 将结果输出到 CSV 文件,或者选择其他格式(如 Parquet)进行存储。


总结:

此代码通过 UDF 计算年龄段,并采用按性别分区、减少 Shuffle、缓存等手段来优化性能。根据数据量和集群资源的不同,可以进一步调整分区数、内存和 CPU 核心数等配置,确保 Spark 程序高效运行。


为什么df_with_age_group.repartitionByRange(200, “gender”)可以避免数据倾斜

在 Spark 中,数据倾斜(Data Skew)通常发生在 groupByjoinrepartition 等操作中,特别是当某些分区的数据量明显大于其他分区时,可能会导致任务执行不均衡,从而拖慢整体处理速度。

repartitionByRange 通过范围划分的方式对数据进行分区,而不是随机划分。这样,分区的方式更加均匀,可以有效避免数据倾斜。让我们详细解释一下:

1. repartitionByRange 的工作原理

repartitionByRange 根据给定的列(比如 gender 列)对数据进行排序和重新分区。它通过对数据进行排序(按指定列或多个列排序)来确保相同值的记录被分配到同一个分区中。

  • 你提供的参数是:200(目标分区数)和 "gender"(需要按这个列来重新分区)。
  • 这个操作会尝试通过排序的方式来将数据分配到不同的分区中,保证每个分区内的数据量尽可能均匀。

例如,如果 gender 列只有两个值 "Male""Female"repartitionByRange(200, "gender") 会尝试将 "Male""Female" 的数据均匀地分配到 200 个分区中。这有助于避免某个分区的“热点”,从而避免倾斜。

2. 为什么避免数据倾斜?

数据倾斜通常发生在某些值(例如某些 gender 或某些键)上出现大量的数据,导致分配到某些分区的任务非常繁重,而其他分区的任务较少。这样会导致计算资源的不均衡,进而影响作业性能。

repartitionByRange 对数据进行范围划分时,会确保相同范围的值集中在相同的分区中,从而减小了某个分区被单一大值(比如某个很大的 "Male""Female" 数量)占据的可能性。假设数据在 gender 上不平衡(例如 "Male""Female" 多得多),repartitionByRange 会确保在每个分区内 MaleFemale 数据量分布较为均衡。

3. 避免 groupByjoin 操作中的倾斜

  • groupBy:如果我们直接使用 groupBy("gender"),那么当某个性别的数量极大时,会导致某个分区上的数据过于集中,形成“热点”,从而影响性能。
  • join:在进行 join 操作时,若两边某些键的数量极其不均衡,也可能造成某些分区的数据量很大,导致数据倾斜。通过 repartitionByRange 可以使得数据的分布更加均匀。

4. 示例:repartitionByRange 可以帮助避免倾斜

假设你有如下的 gender 列数据,且数据分布非常不均衡:

  • 80% 的数据是 Male
  • 20% 的数据是 Female

如果你直接进行 groupBy("gender"),那么会把大部分数据集中到一个分区中,导致 Spark 在该分区上工作量过大,其他分区的任务则相对较小,这样会产生数据倾斜,导致性能下降。

然而,如果你使用 repartitionByRange(200, "gender")

  • 这个操作会根据 gender 列的值将数据范围划分并均匀分配到 200 个分区中。
  • 即使 MaleFemale 数据量不均,repartitionByRange 会尽量平衡两个值的分配,并使每个分区的数据量相对接近,从而避免某些分区的数据过多,而其他分区的数据较少。

5. repartition 的区别

  • repartition基于随机化的重新分区操作,它可能导致某些值过于集中到某些分区,特别是在数据分布不均的情况下,这可能会导致数据倾斜。
  • repartitionByRange基于排序的重新分区,它通过对数据进行排序并按值范围划分分区,从而保证数据在分区之间更加均匀分布。

总结

repartitionByRange(200, "gender") 可以帮助避免数据倾斜,主要是通过按范围对数据进行划分,使得相同值的记录尽量分配到相同的分区,从而减少数据量不均匀的情况,进而提高任务的执行效率和资源利用率。


原始数据中gender是否排序对repartitionByRange(200, “gender”)的性能有影响吗?

在 Spark 中,repartitionByRange 通过对数据进行排序来决定如何重新分区。当使用 repartitionByRange(200, "gender") 时,Spark 会尝试将 gender 列的不同值按范围进行排序并均匀分配到 200 个分区中。

1. repartitionByRange 依赖排序,但原始数据是否已排序不会影响最终结果

  • 排序的本质repartitionByRange 会对数据进行排序,然后基于排序的结果重新分区,因此即使原始数据中 gender 列没有排序,Spark 也会对数据进行排序。原始数据是否已经排序对最终的性能不会产生直接影响,因为 Spark 会在进行数据重新分区时对数据进行完整的排序。
  • 排序的影响:如果原始数据已经按 gender 列排序,Spark 可能会在内部做一些优化,例如避免进行全量的排序步骤,从而在某些情况下节省了部分计算资源。但即使数据没有排序,Spark 也能通过 repartitionByRange 完成排序和重新分区操作。

2. 为什么原始数据排序可能对性能有间接影响

尽管 repartitionByRange 会强制执行排序操作,但在某些情况下,原始数据的排序状态可能影响执行效率。具体来说,以下是几个可能的影响:

  • 数据预排序可能减少不必要的 Shuffle:如果原始数据已经按照 gender 排序,Spark 在执行 repartitionByRange 时可能会有更少的需要移动的数据,因为已经按照一定顺序进行分配。这可能减少内部 shuffle 的开销,因为 Spark 在执行 repartitionByRange 时会根据 gender 的值划分区间,若数据已经按 gender 排序,则可以减少重新排序的计算量。

  • 更低的计算开销:当数据已经按某列(如 gender)排序时,Spark 在进行 repartitionByRange 操作时,可能会减少一部分工作量(例如,减少对整个数据集的排序操作)。这对于较小的分区数和数据量可能是有益的,但对于大规模数据,效果可能不那么明显。

3. 影响的主要因素:数据的规模与分布

  • 数据量:对于较小的 DataFrame,数据已经排序对 repartitionByRange 的性能提升可能不明显。对于较大的数据集,原始数据是否排序可能会有较大的影响,因为排序操作会带来额外的计算开销。
  • 数据分布:如果 gender 列的分布非常不均匀(例如,一些 gender 值占比非常大,其他值非常小),即使数据已经排序,repartitionByRange 可能仍然需要进行大量的 shuffle 操作来保证分区的均匀性。

4. 总结

  • 性能差异:原始数据是否已排序对 repartitionByRange 的性能有一定影响,但影响主要取决于数据的规模和分布。对于非常大的数据集,数据是否排序可能会对计算和 Shuffle 的效率产生影响。
  • 排序操作:即使原始数据没有排序,repartitionByRange 也会对数据进行排序,因此排序本身的开销始终存在。原始数据的排序状态可能只是优化了排序过程的效率,而不会改变最终的结果。

最佳实践:

  • 对于大数据量的情况,推荐始终使用 repartitionByRange 来避免数据倾斜,哪怕原始数据没有排序。
  • 对于小规模数据,如果数据已经按关键列排序,可以考虑直接使用 repartitionByRange,可能会节省一些计算开销。

在线收缩表

gh-ost 是一个非常好的选择,特别是对于需要在生产环境中对大表进行修改、重建或优化时。它是一个由 GitHub 开发的 在线模式表结构更改工具,能够在不锁定表的情况下执行表重建、数据迁移等操作,因此可以用于你的场景——优化表以回收空间,且不影响在线业务。

使用 gh-ost 执行在线优化

gh-ost 通常用于表结构的变更,但也可以用来优化和重建大表,并能够确保操作过程中的最小锁定和最大在线可用性。

安装 gh-ost

  • 下载预编译二进制文件:
    你可以从 GitHub releases 页面下载适合你操作系统的版本:GitHub Releases

使用 gh-ost 来在线优化表

  1. 创建临时表并执行数据迁移
    gh-ost 会创建一个临时表,并且逐步将原表的数据迁移到新表中,整个过程不会锁定原表。

  2. 执行 gh-ost 命令
    假设你要优化的表是 xxx,并且你希望通过 gh-ost 来重建表,并且回收空间。你可以通过以下步骤来实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    gh-ost \
    --user="root" \
    --password="yourpassword" \
    --host="localhost" \
    --database="your_database" \
    --table="xxx" \
    --alter="ENGINE=InnoDB" \
    --approve-renames \
    --chunk-size=1000 \
    --max-load=Threads_running=25 \
    --critical-load=Threads_running=50 \
    --initially-drop-ghost-table \
    --execute

    解释参数:

    • --user--password--host--database:用于连接 MySQL 数据库。
    • --table:指定要优化的表名。
    • --alter="ENGINE=InnoDB":用 ALTER 操作改变表的引擎或做其它修改(你可以不做任何结构改变,只是进行表重建来回收空间)。
    • --approve-renames:确认使用临时表重命名操作。
    • --chunk-size=1000:迁移数据时,按块大小处理数据,避免一次性迁移过多数据造成性能压力。
    • --max-load:指定迁移过程中 MySQL 最大允许的负载(比如最大线程数)。
    • --critical-load:当系统负载超过此限制时,gh-ost 会暂停迁移,以避免影响性能。
    • --initially-drop-ghost-table:开始时就丢弃临时表,以减少空间占用。
    • --execute:表示实际执行操作。如果你先想模拟执行过程,可以先去掉 --execute,这将执行干运行并显示预期的操作步骤。

gh-ost 的工作方式

  • gh-ost 在执行过程中会:
    • 创建一个临时的 “ghost” 表。
    • 使用 MySQL 的 INSERT INTO ... SELECT 或者其他方法将原表的数据逐步迁移到 “ghost” 表。
    • 逐渐迁移数据时,只会在数据块大小(--chunk-size)的粒度上短暂加锁,因此对系统影响较小。
    • 当迁移完成后,会进行原表和临时表的切换(将新表重命名为原表的名字)。
    • 这样,你就能在不中断应用的情况下重建和优化大表。

适用场景

  • 不影响业务gh-ost 能够避免锁定整个表,确保表的数据可以在业务运行时继续读取和写入。
  • 回收空间:通过改变表的存储引擎、重建表等方式,可以回收空间、减少碎片。
  • 高性能gh-ost 通过分块方式逐步进行数据迁移,并且支持动态调整迁移速度,避免对数据库性能造成过大影响。

计算隐含波动率

如果已经获取到期权的行权价,到期时间,价格,底层地产的价格,无风险利率,能否计算出它的隐含波动率,期权价格应该用最新的成交价,还是对手盘的报价?


是的,您可以使用这些信息计算期权的隐含波动率。隐含波动率(IV)是通过反向求解Black-Scholes公式得到的,具体步骤如下:

1. 选择期权价格

期权价格可以使用以下两种数据来源:

  • 最新成交价:反映了市场上最近的交易价格,可能包含最新的市场信息。
  • 对手盘报价:包括买价(Bid)和卖价(Ask),代表市场上可交易的价格范围。

建议:如果要精确反映市场情况,可以用买卖盘中点价(即((Bid + Ask) / 2))。如果市场活跃且最新成交价可信,也可以直接使用成交价。

2. 计算隐含波动率的步骤

隐含波动率需要通过迭代法反向求解,具体流程如下:

  1. 已知参数代入Black-Scholes公式

    • 行权价 ((K))。
    • 到期时间 ((T))。
    • 当前底层资产价格 ((S_0))。
    • 无风险利率 ((r))。
    • 市场上期权价格 ((C_{market}) 或 (P_{market}))。
  2. 设定初始隐含波动率值

    • 通常从一个较低的值(例如 0.1 或 10%)开始。
  3. 用Black-Scholes公式计算理论期权价格

    • 使用假定的隐含波动率,计算理论期权价格 ((C_{BS}) 或 (P_{BS}))。
  4. 比较理论价格与市场价格

    • 如果理论价格与市场价格的差距在容许范围内(例如(\epsilon = 10^{-6})),则认为找到隐含波动率。
    • 如果差距较大,调整隐含波动率(例如使用二分法或牛顿-拉夫森法)并重复计算。

3. Black-Scholes公式回顾

  • 看涨期权价格:
    [
    C = S_0 \cdot N(d_1) - K \cdot e^{-rT} \cdot N(d_2)
    ]
  • 看跌期权价格:
    [
    P = K \cdot e^{-rT} \cdot N(-d_2) - S_0 \cdot N(-d_1)
    ]
  • (d_1) 和 (d_2) 的定义:
    [
    d_1 = \frac{\ln(S_0 / K) + (r + \sigma^2 / 2)T}{\sigma \sqrt{T}}, \quad d_2 = d_1 - \sigma \sqrt{T}
    ]
    其中,(\sigma) 是隐含波动率。

4. 常用求解方法

  • 二分法
    在给定区间内(例如 ([0, 1])),逐步缩小隐含波动率范围。
  • 牛顿-拉夫森法
    使用Black-Scholes公式对(\sigma)的导数进行迭代更新,收敛更快。

总结:您可以根据市场需求选择成交价或中点价作为输入,利用迭代法通过Black-Scholes公式反算隐含波动率。这是一种标准的金融工程方法,用于评估市场对未来波动的预期。


二分法真是无处不在,但我比较纳闷的是,为什么这里可以用二分法呢,曲线是单调平滑的才可以用二分吧,否则二分应该有可能跳过最优解。

您说得非常对,二分法在数值求解中之所以有效,关键在于问题满足特定的单调性连续性条件。在隐含波动率的求解中,二分法可以使用的原因如下:


1. Black-Scholes公式中的单调性

在给定其他参数(如标的价格 (S)、行权价 (K)、到期时间 (T)、无风险利率 (r))不变的情况下,期权价格对隐含波动率 (\sigma) 是单调递增的:

  • 随着波动率的增加,期权价格总是增加。这是因为更高的波动率意味着标的资产价格的不确定性更高,从而增加了期权的价值。

这种单调性保证了期权价格曲线(相对于波动率)没有局部极值,只有一个唯一的解。这为二分法提供了理论基础。


2. 平滑性

Black-Scholes公式是连续且可微的。期权价格关于隐含波动率的导数(称为 Vega)始终为正,且在实用范围内(低波动率到高波动率)变化平滑。

Vega 的公式

Vega 是期权价格对隐含波动率的敏感性,公式为:
[
\text{Vega} = S_0 \cdot \sqrt{T} \cdot N’(d_1)
]
其中 (N’(d_1)) 是标准正态分布的概率密度函数。

Vega 总是非负的,并且其值在波动率较低时较大,波动率较高时逐渐减小。这种平滑性意味着隐含波动率与期权价格之间的关系是良好行为的(单调且无振荡),因此适合用二分法。


3. 为何不会跳过最优解

二分法的基本思想是每次选择解区间的中点,利用单调性将搜索区间缩小一半。在隐含波动率的求解中,由于以下性质,解区间总是收敛到唯一解:

  • 单调性:期权价格随隐含波动率单调增加。
  • 连续性:期权价格对隐含波动率的映射是连续的。
  • 边界条件:波动率为0时,期权价格接近其内在价值;波动率趋于无穷时,期权价格趋于无穷。

因此,二分法从一个足够大的初始区间(如 ([0, 5]))开始,可以保证找到市场价格的隐含波动率。


4. 如果曲线不是单调的会怎样?

如果曲线不是单调的(例如存在局部极值点),二分法可能会失效:

  • 二分法可能跳过最优解。
  • 可能无法收敛到正确的结果。

但在隐含波动率的求解中,这种情况不会发生,因为单调性和连续性已经从理论上保证了解的唯一性。


总结

二分法适用于隐含波动率的求解,是因为:

  1. Black-Scholes公式中期权价格对隐含波动率是单调递增的。
  2. 这种关系是平滑且连续的。
  3. 二分法的迭代过程可以保证找到唯一解。

这也是为什么二分法在数值求解中广泛应用的核心原因之一!


有没有python的库直接计算出隐含波动率,给出示例代码

Python中有多个库可以直接计算隐含波动率,常用的包括 py_vollibmibian。以下是使用这些库的示例代码。


方法 1: 使用 py_vollib

py_vollib 是一个功能强大的金融计算库,支持隐含波动率的计算。

安装

1
pip install py_vollib

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
import py_vollib.black_scholes.implied_volatility as iv

# 参数
option_price = 10.0 # 市场期权价格
S = 100.0 # 当前标的资产价格
K = 105.0 # 行权价
T = 0.5 # 到期时间(以年计)
r = 0.05 # 无风险利率
flag = 'c' # 看涨期权:'c',看跌期权:'p'

# 计算隐含波动率
implied_vol = iv.implied_volatility(option_price, S, K, T, r, flag)
print(f"隐含波动率: {implied_vol:.2%}")

方法 2: 使用 mibian

mibian 是另一个简单的工具库,适合快速计算隐含波动率。

安装

1
pip install mibian

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
import mibian

# 参数
S = 100 # 当前标的资产价格
K = 105 # 行权价
r = 5 # 无风险利率(百分比形式)
T = 30 # 到期时间(以天计)
option_price = 10 # 市场期权价格

# 使用 BSM 模型计算隐含波动率
bsm = mibian.BS([S, K, r, T], callPrice=option_price)
print(f"隐含波动率: {bsm.impliedVolatility:.2f}%")

方法 3: 自行实现迭代法(例如二分法)

如果需要完全控制计算过程,可以实现自定义的隐含波动率计算。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from scipy.stats import norm
import numpy as np

# Black-Scholes定价公式
def black_scholes(S, K, T, r, sigma, option_type='call'):
d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
elif option_type == 'put':
return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)

# 隐含波动率的计算
def implied_volatility(option_price, S, K, T, r, option_type='call', tol=1e-6, max_iter=100):
low, high = 1e-6, 5.0 # 波动率范围
for _ in range(max_iter):
mid = (low + high) / 2
price = black_scholes(S, K, T, r, mid, option_type)
if abs(price - option_price) < tol:
return mid
elif price < option_price:
low = mid
else:
high = mid
return mid

# 参数
S = 100 # 标的资产价格
K = 105 # 行权价
T = 0.5 # 到期时间
r = 0.05 # 无风险利率
option_price = 10 # 市场期权价格

# 计算隐含波动率
iv = implied_volatility(option_price, S, K, T, r, option_type='call')
print(f"隐含波动率: {iv:.2%}")

比较

特点 推荐场景
py_vollib 专业金融工具库,支持多个选项和精度高 复杂期权定价与分析
mibian 简单易用,适合基础场景 快速验证隐含波动率
自行实现 完全可控,可自定义精度和算法 学术研究或高级控制需求