impala be query plan 3 prepare->open->close

知识点 ControlServerice

QueryState

为特定查询创建的所有后端执行状态的中心类(例如:各个片段实例的FragmentInstanceStates)。此类包含或使可访问状态在片段实例之间共享;相反,片段实例特定的状态收集在FragmentInstanceState中。QueryState的生存期由引用计数决定。代表查询执行并访问其任何状态的任何线程都必须获取对相应QueryState的引用,并至少在该访问期间保持该引用。通过QueryExecMgr::Get-/ReleaseQueryState()或QueryState::ScopedRef(后者用于仅限于单个函数或块范围的引用)获取和发布引用。只要引用计数大于0,查询的所有控制结构(包含在该类中或可通过该类访问,如FragmentInstanceStates)都保证是活动的。

FragmentInstanceState

FragmentInstanceState处理单个计划片段实例执行的所有方面,包括成功和错误情况下的设置和终结。Close()在Exec()结束时自动发生,释放为此片段实例分配的所有内存,并关闭所有数据流。

堆栈

  • ControlService::ControlService(MetricGroup* metric_group)
  • this->ExecQueryFInstances(static_cast<const ExecQueryFInstancesRequestPB*>(req),
  • void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request, ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context)
  • const Status& fragment_info_sidecar_status = GetSidecar(request->plan_fragment_info_sidecar_idx(), rpc_context, &fragment_info);
  • Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, query_ctx, fragment_info);
  • QueryState* qs = GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
  • Status status = qs->Init(request, fragment_info);
  • 这里主要为初始化
  • TExecPlanFragmentInfo by fragemtn_ifno
  • unique_ptr<Thread> t;
  • status = Thread::Create("query-exec-mgr",Substitute("query-state-$0", PrintId(query_id)), &QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
  • bool QueryState::StartFInstances()
  • bool QueryState::StartFInstances();
  • start_finstances_status = FragmentState::CreateFragmentStateMap(fragment_info_, exec_rpc_params_, this, fragment_state_map_)
  • for(fragment_size)
  • //根据instance_size 创建分布式fragment, 即单个查询拆分为多个fragment
  • FragmentState* fragment_state = state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx));
  • for(fragment_size)
  • fragment_state->init();
  • Status PlanNode::CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root)
  • Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root); (递归创建)
  • Status PlanNode::CreateTreeHelper(FragmentState* state, const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx, PlanNode** root)
  • const TPlanNode& tnode = tnodes[*node_idx];
  • int num_children = tnode.num_children;
  • RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node));(创建PlanNode)
  • *node = pool->Add(new ScanPlanNode());/PartitionedHashJoinPlanNode/
  • for(num_children)
  • CreateTreeHelper(state, tnodes, node, node_idx, nullptr)); 递归
  • RETURN_IF_ERROR(node->Init(tnode, state));
  • Status = HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state)(这里可以了解下impala ShardedState 的概念,将同一个be节点不同的 scanode 放到了一个队列来处理)
  • for (auto& fragment : fragment_state_map_) {
  • FragmentState* fragment_state = fragment.second;
  • for (int i = 0; i < fragment_state->instance_ctxs().size(); ++i)
  • //创建 FragmentInstanceState
  • FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx, *instance_ctx_pb));
  • fis_map_.emplace(fis->instance_id(), fis);
  • unique_ptr<Thread> t;
  • //执行单个 FragmentInstanceState ExecFInstance
  • Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,[this, fis]() { this->ExecFInstance(fis); }, &t, true);
  • void QueryState::ExecFInstance(FragmentInstanceState* fis)
  • Status status = fis->Exec();(Status FragmentInstanceState::Exec())
  • Status status = Prepare();
  • status = Open();
  • Close();
  • Status FragmentInstanceState::Prepare()
  • runtime_state_ = obj_pool()->Add(new RuntimeState(query_state_, fragment_,instance_ctx_, fragment_ctx_, instance_ctx_pb_, ExecEnv::GetInstance()));
  • Init();
  • resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
  • instance_mem_tracker_ = obj_pool()->Add(new MemTracker(runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
  • runtime_state_->resource_pool()->AcquireThreadToken();(获取一个线程资源)
  • const PlanNode* plan_tree = fragment_state_->plan_tree();
  • //ExecNode 执行节点,根据 PlanNode 创建ExecNode
  • RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_, *plan_tree, query_state_->desc_tbl(), &exec_tree_));
  • RETURN_IF_ERROR(plan_node.CreateExecNode(state, root));(这里举例一个PartitionedHashJoinNode)
  • ObjectPool* pool = state->obj_pool();
  • *node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));(Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) ScanNode 也是同理)
  • for (auto& child : plan_node.children_) { 递归创建子节点的ExecNode
  • ExecNode* child_node;
  • RETURN_IF_ERROR(CreateTree(state, *child, descs, &child_node));
  • DCHECK(child_node != nullptr);
  • (*root)->children_.push_back(child_node);
  • }
  • //当前 Fragement Instance State ExecNode 创建完成
  • //1 ExchangeNode
  • // set #senders of exchange nodes before calling Prepare()
  • vector<ExecNode*> exch_nodes;
  • exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
  • //2 scanNode
  • vector<ExecNode*> scan_nodes;
  • ScanRangesPB no_scan_ranges;
  • exec_tree_->CollectScanNodes(&scan_nodes);
  • static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges.scan_ranges());
  • //3
  • RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_)); //Status ExecNode::Prepare(RuntimeState* state)
  • mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
  • for (int i = 0; i < children_.size(); ++i) {
  • RETURN_IF_ERROR(children_[i]->Prepare(state));
  • }
  • //Status HdfsScanNodeMt::Prepare(RuntimeState* state)
  • //4 prepare sink_
  • const DataSinkConfig* sink_config = fragment_state_->sink_config();
  • DCHECK(sink_config != nullptr);
  • sink_ = sink_config->CreateSink(runtime_state_);
  • RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
  • //5 row batch 数据
  • row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),runtime_state_->instance_mem_tracker()));
  • Status FragmentInstanceState::Open()
  • RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
  • return sink_->Open(runtime_state_);
  • void FragmentInstanceState::Close()
  • for (int i = 0; i < children_.size(); ++i) {
  • children_[i]->Close(state);
  • }
展开

认识一个下FragmentInstanceState

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
<<上一篇
下一篇>>