Disaggregation

请求处理

start_disaggregation_event_loop函数创建loop处理不同逻辑的Task 创建steersman记录每个replica的物理所在位置,spawn_migration_queue,并对每个replica spawn出disaggregation_event_loop_inner,以状态机的实现方式,向对应的replica发送请求,以及根据replica的状态进行请求的传递(zigzag/pd migration)。 之后,创建DisaggregationController,进行reject_kv_cache_event_loop(避免decode实例过载)和report_system_metrics 不指定manually_scale编译选项时spawn出auto_scale_event_loop进行自动扩容()

auto_scale_event_loop

在planner的replica_state_moniter_loop中 循环generate_scale_target,并判断当前扩缩容feature,对于blitz使用exec_blitz中的扩缩容方案,sllm使用exec_serverless中的方案。 exec_serverless直接向实例发送请求,从hostmem或者disk中加载参数 而exec_blitz则使用RDMA p2p, NVLink p2p, NVLink broadcast或Tanz等方式进行扩容

reject_kv_cache_event_loop

动态检测并锁定某些decode副本,防止继续接收kvcache请求。 遍历所有副本,并记录处于decode或Rdmasending状态副本的used_blocks。之后如果发现有decode使用的blocks高于平均值的两倍,则锁定该副本。 实际执行上会与上次锁定的副本进行比较,只锁定新增的副本。

report_network_flow

收集每个replica的flow_in和flow_out,构建flow_map用来打印日志

report_system_metrics

遍历每个replica_metric,打印state,used_blocks以及是否加载模型等信息

report_cur_waiting_prefill_tokens

当前deprecated

disaggregation_event_loop_inner

核心循环,以状态机的方式实现对不同replica状态的逻辑,包括batch发送,请求migration等逻辑。对于每个replica,都创建出处理当前replica的event_loop。传入参数如下

  1. 用于同步的init_guard
  2. 状态切换命令的ReplicaCommand Channel
  3. queue
  4. migration queue
  5. replica_metric

在循环中每次取出当前replica状态,尝试接收Command并根据状态执行不同的处理逻辑。没有接收到Command就直接使用当前状态

inactive/loadingprefill/loadingdecode

yield等待

shuttingnull

将当前replica上锁,避免被其他进程操作。 yield等待

decode

循环从migration_queue中try_consume batch(tokens batch entries)并压入global_batch。如果global_batch已经为空则yield释放,否则进行一次Decode 包括对系统metric进行设置,将batch和last_token通过stub发送给后端(decode_v2),将generations通过stream sender发送给等待向流式请求返回响应的线程。如果是最后一个token返回InferStreamResponse::End,否则返回InferStreamResponse::Intermediate,接收者会进行不同的处理。当前loop会在请求生成最后一个token时从entries中移除该请求。之后将结束的请求向server发送clear_cache,未结束的请求发送filter_batch并重新放入global_batch,进行下一轮处理

prefill

如果当前需求的block已经大于每个replica最大的block数,则yield 否则从queue中调用next_batch得到下一个prefill batch,增加使用的blocks计数,构造PrefillCase::Normal的prefillRequest,调用stub.prefill_v2得到prefill的Response,发送InferStreamResponse::PrefillDone & InferStreamReponse::Intermediate | InferStreamResponse::End,并判断client是否quit,如果由于client quit或Response::End则从entries中移除,并根据移除情况进行clear_cache / filter_batch 由于是PD分离,将Prefill的结果构造为MigrationBatch,通过flow_watcher和migration_queue进行传递(batch entries tokens),交由Decode实例进行Decode阶段

newprefill

用于ZigZag执行的前半段,确保当前replica持有锁 初始化zig_zag相关的时间戳并检查模型是否已经加载 模型已加载则从pending_zag_prefill队列头部取出batch和前半段prefill的response,执行post_prefill_pre_decode。然后create_refact_head_task创建任务(循环从pending_zag_prefill请求中取出等待,并将将结果进行filter和send),切换到RefactoryPrefill状态,进入下一轮循环 如果当前replica使用的block已经超上限,则yield进入下一轮循环 如果模型未加载完毕,则从Relay_queue中尝试取出relay请求,向所有stub发送relay请求等待response(将完成的前半段请求传递给old prefill)。 如果response中包含batch_id和seq_num,则循环从pending_zag_prefill中pop。如果pop出的prefill response start_layer为空则post_prefill_pre_decode,并修改model_loaded为true,否则break循环。此时已经得到start_layer非空的request,根据start_layer类型,transformerlayer部分迁移,向migration_queue append_partial_fst,之后通过sender通知OldPrefill(forwardcase为naivepp和immigrate)。embeddinglayer则直接迁移,同样sender通知OldPrefill(forwardcase为normal)并通知所有stub clear_cache 如果response中不包含batch_id和seq_num,则重新判断model_loaded。如果model_loaded被置为true

之后判断pending_zag_prefill的长度并yield控制zigzag速率 从batching_queue中获取下一个batch并发起新的zigzagprefill,压入pending_zag_prefill队列等待后续处理

refactoryprefill

这个状态用于批量处理pending_zag_prefill队列中的任务,在relay_queue中检查relay请求,如果有则进入relay处理流程 relay处理流程首先向所有stub调用relay操作(传入relay_rank),如果response中包含了batch_id和seq_num,则说明有batch需要迁移,取出pending_zag_prefill中对应的future,等待完成之后判断response的start_layer,根据transformer和embedding进行NewPrefill中同样的处理。 如果response不包含batch_id,说明迁移已经完成,统计pending_zag_prefill并进行异常检查,通过sender通知OldPrefill,等待refactory_head_task结束之后重置状态为Prefill

oldprefill

用于ZigZag执行的后半段 调用全局relay_queue的append,在这里通过async_channel_sender发送请求(indices: 当前所在PP rank),等待收到PartialPrefill请求。 收到partialprefill请求后调用prefill_v2并执行filter、send逻辑。 如果请求的start_layer是TransformerLayer,则PartialMigration(append_partial_snd,因为前半段prefill也有kvcache需要migration),指定层级迁移部分kvcache;如果是EmbeddingLayer则append迁移整个batch的kvcache,因为此时当前replica保有全部的kvcache 这两者的不同会在spawn_migration_queue中进行分别处理。 如果在relay_queue中没有拿到partialprefill请求,则等待relay_cnt归零将状态切换为Prefill

mutatingtodecode

向Migration_queue发送Flush命令,将所有属于当前replica未迁移的batch一次性取出,放入global_entries和global_batches,供decode后续使用。 之后状态切换为Decode

auspreflil

持锁之后转变为prefill

ausdecode

持锁之后转变为decode

shuttingdecode

decode状态切换到彻底关闭的清理状态,目的是完成所有剩余的decode任务。如果global_batches为空并且持有锁则切换到shuttingNull 如果还没有持有锁,则尝试异步获取锁防止并发冲突(或yield等待join_handle结束持锁) 获取到锁之后不断从migration_queue中消费属于本副本的migration_queue,并循环从global_batches中取出request进行decode,同样对每轮生成filter_send_generations,直到global_batches被清空

shuttingprefill

当unfinished_migration_count归零时切换到ShuttingNull,否则yield等待 unfinished_migration_count会在向migration_queue中压入batch时increment

sending/loading/casting

panic

扩缩容

通过状态机进行状态的切换