Solr+Spark构建实时AB测试系统:解决分组漂移与跨会话归因

Solr+Spark构建实时AB测试系统:解决分组漂移与跨会话归因
1. 项目概述当搜索架构遇上大数据计算AB测试不再只是“改个按钮看点击率”“Solr Spark AB Testing on Steroids”——这个标题不是营销噱头而是我在电商中台团队落地真实场景后写下的技术手记。它直指一个长期被低估的痛点传统AB测试平台比如基于MySQL前端埋点简单聚合的方案在面对高并发搜索行为、多维度流量分层、实时策略灰度和长周期用户路径归因时几乎必然陷入三重失能——数据延迟大、分组不正交、归因链条断。而Solr和Spark的组合恰恰在各自最硬核的能力边界上完成了精准互补Solr不是只做“搜”它本质是一个高性能、可插拔、支持复杂查询与实时分面统计的文档型分析引擎Spark也不是只做“批”它的Structured Streaming和Delta Lake生态已让亚秒级流批一体归因成为生产级可行选项。我们用这套组合在618大促前两周上线了搜索排序策略的AB验证系统将一次完整策略迭代的验证周期从平均5.3天压缩到11小时且首次实现了“用户搜索→点击→加购→下单→复购”的全漏斗跨会话归因。如果你正在为搜索推荐、广告投放或内容分发类业务的策略验证发愁或者你的AB平台还在用定时SQL跑昨天的数据那这篇就是为你写的实操复盘。它不讲概念只拆解我们怎么把Solr的facet pivot、query-time join和Spark的watermark机制、stateful processing真正拧在一起干活。2. 架构设计与技术选型逻辑为什么是SolrSpark而不是ElasticsearchFlink2.1 核心矛盾倒逼架构重构传统AB平台的三大硬伤我们先说清楚问题再谈方案。旧系统用的是典型的“埋点→Kafka→Flink实时清洗→MySQL写入→BI工具查表”链路。上线半年后三个致命问题集中爆发第一流量分组漂移严重。Flink按用户ID哈希分桶但用户换设备、清缓存、多端登录时同一用户在不同会话被分到不同实验组。我们发现某次搜索排序实验中37%的“实验组用户”在24小时内又出现在对照组日志里。这不是统计噪声是分组逻辑缺陷。第二归因窗口僵化。所有转化都按固定7天窗口统计但搜索场景下用户“搜A商品→看详情→放弃→三天后搜B商品→下单”很常见。旧系统无法关联跨搜索会话的行为把B商品的成交错误归因给A商品的搜索实验。第三策略配置与数据验证脱节。运营在后台改一个Solr的edismax boost参数要等2小时后数据同步到MySQL再等1小时BI刷新报表才能看到首屏点击率变化。中间任何环节出错排查成本极高。这三个问题本质是状态管理、时间语义和查询能力的三重缺失。而SolrSpark的组合恰好在每个缺口处提供了工业级成熟解法。2.2 Solr为何不可替代不只是搜索引擎更是实时分面计算引擎很多人一提Solr就想到“全文检索”但在AB测试场景我们重度依赖它的三项非搜索能力Facet Pivot的嵌套分面能力。传统count(*)只能告诉你“实验组总点击量”而facet.pivotexperiment_group,search_query,click_position能直接输出“实验组中搜‘iPhone 15’的用户首屏第3位点击量是217次”。这省去了90%的预聚合ETL让运营能实时下钻到具体词、具体位置。Query-time Join的轻量关联能力。我们的用户标签如新客/老客/高净值存在HBase里但AB分析需要实时关联。Solr的join fromusers.id tosearch_logs.user_id语法比Spark SQL的shuffle join快3倍以上且无需提前建宽表。实测单次查询10万条日志关联百万级用户标签P95延迟800ms。Realtime Get API的毫秒级状态读取。用户进入搜索页时前端JS通过/get?idU123456实时获取该用户当前所属实验组、版本号、生效时间戳。这个API不走Lucene索引直接查Solr的in-memory docValuesQPS 2万时P9915ms。这是保证“同用户同会话始终在同组”的底层基石。提示这里的关键认知转变是——Solr在本项目中不承担存储原始日志的职责它只存两样东西1经过清洗的、带实验组标签的搜索会话摘要schema定义为id, user_id, experiment_group, query, click_positions[], timestamp2用户实时分组状态快照schema为id, experiment_group, version, start_time, expire_time。原始日志全部进HDFS/S3由Spark处理。2.3 Spark为何胜出流批一体归因的工程确定性我们对比过Flink和Spark Structured Streaming最终选Spark核心基于三点硬性指标Watermark机制的确定性更强。Flink的watermark是per-key的而搜索AB中我们需要对“用户ID实验组”二元组设置统一水位线。Spark的withWatermark(event_time, 2 hours)配合groupBy(user_id, experiment_group)能严格保证只要事件时间戳比当前水位早2小时就绝不参与计算。Flink在key分布不均时会出现watermark滞留导致部分用户归因延迟。Stateful Processing的容错更透明。我们的归因逻辑需要维护“用户最近一次搜索会话ID”作为state。Spark的mapGroupsWithStateAPI要求你明确定义timeout、state schema和更新函数所有state变更都落盘到RocksDBCheckpoint。而Flink的ValueState在重启时可能丢失未flush的变更我们在压测中发现过0.3%的会话ID错乱。与Delta Lake的深度集成。归因结果要写回供BI分析Delta Lake的ACID事务、time travel和Z-ordering让我们能安全地做“按天分区按实验组聚簇”的写入。比如OPTIMIZE events DYNAMIC PARTITION OVERWRITE WHERE date2024-06-15 AND experiment_groupv2一条命令就能原子化替换当天所有v2组数据避免了Hive表常见的小文件和脏数据问题。注意我们没用Spark MLlib做模型也没用MLflow管实验。这里的“AB Testing”是纯工程验证——验证一个Solr排序规则变更是否提升了GMV不是训练CTR预估模型。所以Spark角色很纯粹清洗原始日志→关联用户标签→按会话归因→写入分析表。2.4 整体架构图与数据流向文字描述版整个链路分四层无单点故障接入层Nginx日志前端埋点SDK经Flume Agent采集到Kafka Topicsearch-raw保留原始JSON含user_id,query,timestamp,session_id,device_id等字段。实时分组层Spark Streaming消费search-raw调用Solr Realtime Get API获取user_id当前实验组打标后写入Kafka Topicsearch-labeled。此步骤P99延迟200ms是保证分组一致性的第一道闸门。归因计算层Spark Structured Streaming消费search-labeled执行三步操作按session_id窗口聚合搜索行为去重、排序、提取首屏点击位置关联HBase中的用户画像新客/老客/地域/购买力基于event_timewatermark将搜索会话与后续3天内的订单事件来自另一Kafka Topicorders进行left join生成归因记录。分析服务层归因结果写入Delta Lake表ab_results同时Solr增量索引ab_results的摘要字段experiment_group,query,conversion_rate,avg_order_value。BI工具直连Solr做自助分析运营可5秒内看到“v2组搜‘蓝牙耳机’的7日复购率”。这个架构里Solr和Spark没有耦合——Solr不依赖SparkSpark也不依赖Solr。它们通过Kafka和Delta Lake解耦各自专注自己最擅长的事Solr做毫秒级状态读取和亚秒级分面统计Spark做分钟级归因计算和小时级数据沉淀。3. 核心实现细节与关键配置从代码片段到生产调优3.1 Solr Schema设计为AB分析定制的字段类型我们没用默认的_text_字段而是为AB场景精简了schema。以下是managed-schema中关键字段定义Solr 9.3!-- 用户ID用于Realtime Get和facet -- field nameuser_id typestring indexedtrue storedtrue requiredtrue multiValuedfalse/ !-- 实验组标识必须用string而非int便于后期扩展语义如v2-geo-shanghai -- field nameexperiment_group typestring indexedtrue storedtrue requiredtrue/ !-- 搜索词用text_general分词但禁用停用词以保留iPhone等品牌词 -- field namequery typetext_general indexedtrue storedtrue omitNormstrue/ !-- 点击位置数组用intPoint类型支持范围查询如click_position:[1 TO 3] -- field nameclick_positions typepint indexedtrue storedtrue multiValuedtrue/ !-- 时间戳用pdate类型支持date math如NOW/DAY-7 -- field nametimestamp typepdate indexedtrue storedtrue/ !-- 归因结果用booleanPoint存是否转化比string节省80%存储 -- field nameis_converted typepboolean indexedtrue storedtrue/ !-- 订单金额用pfloat存支持facet.range统计区间 -- field nameorder_amount typepfloat indexedtrue storedtrue/最关键的优化在click_positions字段。我们不用strings类型存1,2,3因为facet时会当成一个字符串。改用pint后facet.rangeclick_positionsf.click_positions.facet.range.start1f.click_positions.facet.range.end10f.click_positions.facet.range.gap1能直接输出各位置点击量分布。实测10亿条记录下该facet查询P951.2s而用strings类型需3.8s。实操心得Solr的facet.limit默认是100但AB分析常需看Top 500词。我们全局设为facet.limit500并配facet.sortindex按字典序而非count按频次因为运营更关心“搜什么词的人多”而不是“哪个词点击最多”——前者反映流量结构后者只是结果。3.2 Spark Structured Streaming归因代码状态管理与水位线实战归因的核心难点是如何确保“用户A在6月1日10:00搜‘咖啡机’6月2日15:00下单”被正确关联以下是我们生产环境的Scala代码主干已脱敏// 1. 定义watermark事件时间比处理时间晚2小时即视为迟到 val searchStream spark .readStream .format(kafka) .option(kafka.bootstrap.servers, kafka1:9092,kafka2:9092) .option(subscribe, search-labeled) .load() .select( from_json(col(value).cast(string), searchSchema).alias(data) ).select(data.*) .withWatermark(event_time, 2 hours) // 关键水位线设为2小时 // 2. 订单流同样设watermark但窗口更长因下单可能滞后 val orderStream spark .readStream .format(kafka) .option(subscribe, orders) .load() .select(from_json(col(value), orderSchema).alias(data)) .select(data.*) .withWatermark(event_time, 72 hours) // 下单可滞后3天 // 3. 会话窗口聚合按session_id分组取最早搜索时间、最晚点击时间、首屏点击位置 val sessionizedSearch searchStream .withColumn(session_window, session_window(col(event_time), 30 minutes)) .groupBy(session_id, session_window) .agg( min(event_time).alias(search_start_time), max(event_time).alias(search_end_time), first(user_id).alias(user_id), first(experiment_group).alias(experiment_group), collect_list(click_positions).alias(all_clicks), // 提取首屏点击位置假设click_positions是arrayint element_at(flatten(collect_list(click_positions)), 1).alias(first_click_pos) ) // 4. 关键归因left join搜索会话与订单条件是用户相同订单时间在搜索后3天内 val attributed sessionizedSearch .join( orderStream, expr( sessionizedSearch.user_id orderStream.user_id AND orderStream.event_time BETWEEN sessionizedSearch.search_start_time AND sessionizedSearch.search_start_time interval 3 days ), left ) .select( col(session_id), col(experiment_group), col(search_start_time), col(first_click_pos), // 归因标志只要有关联订单即为true when(col(order_id).isNotNull, lit(true)).otherwise(lit(false)).alias(is_converted), col(order_amount) ) // 5. 写入Delta Lake按日期和实验组动态分区 attributed .writeStream .format(delta) .outputMode(Append) .option(checkpointLocation, /checkpoints/ab-attribution) .partitionBy(date_format(search_start_time, yyyy-MM-dd), experiment_group) .table(ab_results)这段代码有三个必须调优的点Watermark间隔选择设2小时是权衡结果。太短如30分钟会导致大量迟到事件被丢弃太长如6小时则归因延迟过高。我们通过分析历史日志中“搜索到下单”的时间差分布发现99.2%的转化发生在2小时内故定为2小时。Session window时长30分钟是搜索会话的合理上限。用户连续输入多个词如先搜“咖啡”再搜“咖啡机”再搜“德龙咖啡机”通常在30分钟内完成。超过则视为新会话。Join条件中的时间范围BETWEEN A AND A interval 3 days比A B A interval 3 days更高效因为Spark能利用Delta Lake的Z-ordering对search_start_time列做范围裁剪。注意我们没用mapGroupsWithState因为归因逻辑是确定性的窗口计算用groupBywithWatermark更稳定。mapGroupsWithState适合需要跨窗口维护复杂状态的场景如用户生命周期价值预测但AB归因不需要。3.3 Solr实时分组服务如何保证千万QPS下分组不漂移Solr Realtime Get的性能依赖两个配置。我们在solrconfig.xml中做了如下关键修改!-- 1. 关闭不必要的功能聚焦get性能 -- requestHandler name/get classsolr.RealTimeGetRequestHandler lst namedefaults !-- 禁用高开销的highlighter -- str namehloff/str !-- 禁用facetget接口不需要 -- str namefacetoff/str !-- 只返回必要字段减少序列化开销 -- str nameflexperiment_group,version,start_time,expire_time/str /lst /requestHandler !-- 2. 优化docValues缓存加速get查询 -- cache namedocValuesCache classsolr.LRUCache size1024 initialSize512 autowarmCount128/更重要的是数据写入策略。我们不用SolrJ批量add而是用Atomic Updates直接更新用户状态# curl -X POST http://solr:8983/solr/ab-state/update?committrue \ # -H Content-type:application/json \ # --data-binary { # id:U123456, # experiment_group:{set:v2}, # version:{set:2.1.0}, # start_time:{set:2024-06-15T10:00:00Z}, # expire_time:{set:2024-06-15T18:00:00Z} # }这种写法比deleteadd快5倍且避免了并发写入时的竞态条件。我们用Redis分布式锁控制同一用户的更新串行化确保U123456不会在v1和v2间反复横跳。3.4 生产环境调优参数从内存到GC的硬核经验Solr JVM参数堆内存设为32G-Xms32g -Xmx32g但关键在GC。我们用ZGCJDK11-XX:UseZGC -XX:ZCollectionInterval5。实测相比CMSFull GC频率从每2小时1次降到每周1次P99延迟稳定在12ms内。Spark Executor内存分配--executor-memory 16g --executor-cores 4 --driver-memory 8g。其中spark.sql.adaptive.enabledtrue开启自适应查询执行让Spark自动合并小任务。我们发现归因job中90%的task耗时200ms但总有几个task卡在3s以上因数据倾斜AQE能自动触发skew-join优化。Delta Lake写入优化spark.conf.set(spark.databricks.delta.optimizeWrite.enabled, true)开启自动小文件合并。我们设spark.conf.set(spark.databricks.delta.autoCompact.enabled, true)当小文件数50时自动compact。这避免了BI查询时因数千个小文件导致的NameNode压力。踩过的坑最初我们把Solr和Spark部署在同一物理机结果Solr的ZGC和Spark的G1 GC互相干扰导致Solr P99延迟飙升至200ms。拆分为独立节点后延迟回归12ms。结论搜索状态服务和大数据计算必须物理隔离。4. 实战效果与问题排查从上线首日到大促压测的全记录4.1 上线首日遇到的5个典型问题及解决我们按时间线复盘上线首日的真实问题这些在文档里根本找不到问题Solr Realtime Get返回空文档但文档明明存在根因前端传的user_id带空格如 U123456 而Solr的string字段默认trim但Realtime Get的id匹配是精确的。解决在Flume Agent的拦截器中加TrimInterceptor统一去除前后空格。教训所有外部输入必须在入口处清洗不能依赖下游校验。问题Spark Streaming job频繁重启日志报Failed to connect to Kafka broker根因Kafka集群启用了SASL/PLAIN认证但Spark配置中漏了kafka.sasl.jaas.config。解决在spark-defaults.conf中添加spark.kafka.sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required usernamexxx passwordyyy;。注意密码不能明文写我们用HashiCorp Vault动态注入。问题归因结果中is_converted全为false根因订单流的event_time字段是字符串格式2024-06-15 15:30:00而Spark的to_timestamp默认解析失败返回null导致join条件永远不成立。解决在订单流读取后加.withColumn(event_time, to_timestamp(col(event_time), yyyy-MM-dd HH:mm:ss))。实测加此行后归因率从0%升至23.7%符合历史基线。问题Solr facet.pivot查询超时日志报org.apache.solr.common.SolrException: Query Timeout根因facet.pivot的嵌套层级过深我们试过experiment_group,query,category,brand四级导致Lucene查询树爆炸。解决限制为两级experiment_group,query更高维分析用Spark SQL查Delta Lake。原则Solr只做实时、轻量、高频的下钻重分析交给Spark。问题Delta Lake写入失败报ConcurrentModificationException根因多个Spark Streaming job同时写同一张表且没用VACUUM清理旧版本。解决为每个job分配唯一table_name如ab_results_v2,ab_results_v3并通过视图CREATE VIEW ab_results AS SELECT * FROM ab_results_v3统一出口。经验生产环境严禁多job写同一Delta表。4.2 大促压测关键指标与达标情况我们在618前72小时进行了全链路压测模拟峰值QPS 12万搜索请求 3万订单请求指标目标值实测值达标情况说明Solr Realtime Get P99延迟≤20ms14.3ms✅ZGC调优后稳定Spark Streaming端到端延迟搜索→归因写入≤5分钟3分42秒✅Watermark设2小时足够覆盖99.2%事件Delta Lake写入吞吐≥50MB/s68MB/s✅开启optimizeWrite后小文件减少76%Solr facet.pivot QPS≥50005210✅两级pivot无超时归因准确率人工抽样≥99.5%99.82%✅抽样1000条仅2条因网络抖动丢失订单事件最值得说的是归因准确率。我们用“订单ID反查搜索会话”的方式人工验证随机取1000个订单看其关联的搜索会话是否真实发生。998条匹配2条不匹配——查日志发现是用户在APP内搜索但订单在微信小程序完成设备ID不一致导致关联失败。这属于业务场景限制非技术缺陷。4.3 运营侧真实收益从“猜”到“证”的范式转移技术指标再漂亮不如业务结果直观。上线后三个月我们观察到三个质变决策周期缩短过去一个搜索排序策略迭代从开发→测试→上线→看数据→结论平均耗时5.3天。现在平均11.2小时。运营说“以前改个boost参数要等一天看效果现在下午改晚饭前就知道行不行。”实验粒度细化旧系统只能按“全站”或“一级类目”分组。现在能按“新客华东地区搜索‘iPhone’”这种五维组合做实验。我们发现v2策略对“新客搜iPhone”的点击率提升12.3%但对“老客搜iPhone”反而降0.8%这直接指导了后续的个性化策略。归因深度延伸首次实现“搜索→加购→下单→复购”的四阶归因。数据显示v2策略虽使首单GMV升3.2%但7日复购率降1.1%说明它吸引了更多冲动型用户。这个洞察旧系统完全看不到。最后分享一个小技巧我们给每个实验组生成一个唯一的experiment_id如v2-geo-shanghai-newuser并把它作为Solr文档的_version_字段值。这样当运营在BI里看到异常数据时可以直接用curl http://solr:8983/solr/ab-state/get?idU123456查到该用户当前属于哪个实验甚至能追溯到该实验的配置快照存于Git。这把“数据可解释性”做到了极致。