Impala:Impalad impala-server beeswax 调用过程及关系图
一 UML
二 本文主要讲解过程
- main(daemain-main.cc)
- ImpaladMain/StatestoredMain/CatalogdMain/AdmissiondMain (impalad-main.cc)
- ExecEnv.init()
- impala_server = new ImpalaServer(exec_env)
- impala_server->Start((FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port, FLAGS_external_fe_port)
- impaa_server->Join
- // Initialize the client servers.
- shared_ptr<ImpalaServer> handler = shared_from_this();
- if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
- {
- //ImpalaServiceProcessor
- shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
- //ImpalaServiceProcessor
- shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
-
- //设置实践句柄
- beeswax_processor->setEventHandler(event_handler);
-
- //BEESWAX_SERVER_NAME beeswax_processor beeswax_port ThriftServerBuilder
- ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
-
- if (IsExternalTlsConfigured())
- {
- LOG(INFO) << "Enabling SSL for Beeswax";
- builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
- .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
- .ssl_version(ssl_version)
- .cipher_list(FLAGS_ssl_cipher_list);
- }
-
- ThriftServer * server;
- //build server
- //这是属于啥语法
- RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
- .metrics(exec_env_->metrics())
- .max_concurrent_connections(FLAGS_fe_service_threads)
- .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
- .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
- .Build(&server));
-
- //beeswax_server_
- beeswax_server_.reset(server);
- beeswax_server_->SetConnectionHandler(this);
- }
-
-
- if (beeswax_server_.get())
- {
- RETURN_IF_ERROR(beeswax_server_->Start());
- LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
- }
三 基础关系篇
1 BeeswaxServiceIf 系列
- class ImpalaServiceIf : virtual public ::beeswax::BeeswaxServiceIf
-
- class ImpalaServer : public ImpalaServiceIf,
- public ImpalaHiveServer2ServiceIf,
- public ThriftServer::ConnectionHandlerIf,
- public std::enable_shared_from_this<ImpalaServer>,
- public CacheLineAligned
2 TProcessorEventHandler
- namespace apache {
- namespace thrift {
-
- /**
- * Virtual interface class that can handle events from the processor. To
- * use this you should subclass it and implement the methods that you care
- * about. Your subclass can also store local data that you may care about,
- * such as additional "arguments" to these methods (stored in the object
- * instance's state).
- */
- class TProcessorEventHandler {
- public:
- virtual ~TProcessorEventHandler() {}
-
- /**
- * Called before calling other callback methods.
- * Expected to return some sort of context object.
- * The return value is passed to all other callbacks
- * for that function invocation.
- */
- virtual void* getContext(const char* fn_name, void* serverContext) {
- (void)fn_name;
- (void)serverContext;
- return NULL;
- }
-
- /**
- * Expected to free resources associated with a context.
- */
- virtual void freeContext(void* ctx, const char* fn_name) {
- (void)ctx;
- (void)fn_name;
- }
-
- /**
- * Called before reading arguments.
- */
- virtual void preRead(void* ctx, const char* fn_name) {
- (void)ctx;
- (void)fn_name;
- }
-
- /**
- * Called between reading arguments and calling the handler.
- */
- virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
- (void)ctx;
- (void)fn_name;
- (void)bytes;
- }
-
- /**
- * Called between calling the handler and writing the response.
- */
- virtual void preWrite(void* ctx, const char* fn_name) {
- (void)ctx;
- (void)fn_name;
- }
-
- /**
- * Called after writing the response.
- */
- virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
- (void)ctx;
- (void)fn_name;
- (void)bytes;
- }
- }
- }
-
-
-
- namespace impala
- {
-
- /// An RpcEventHandler is called every time an Rpc is started and completed. There is at
- /// most one RpcEventHandler per ThriftServer. When an Rpc is started, getContext() creates
- /// an InvocationContext recording the current time and other metadata for that invocation.
- class RpcEventHandler : public apache::thrift::TProcessorEventHandler
- {
- public:
- RpcEventHandler(const std::string & server_name, MetricGroup * metrics);
-
- }
- }
3 TProcessor 系列
- namespace apache {
- namespace thrift {
- /**
- * A processor is a generic object that acts upon two streams of data, one
- * an input and the other an output. The definition of this object is loose,
- * though the typical case is for some sort of server that either generates
- * responses to an input stream or forwards data from one pipe onto another.
- *
- */
- class TProcessor {
- public:
- virtual ~TProcessor() {}
-
- virtual bool process(stdcxx::shared_ptr<protocol::TProtocol> in,
- stdcxx::shared_ptr<protocol::TProtocol> out,
- void* connectionContext) = 0;
-
- bool process(stdcxx::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) {
- return process(io, io, connectionContext);
- }
-
- stdcxx::shared_ptr<TProcessorEventHandler> getEventHandler() const { return eventHandler_; }
-
- void setEventHandler(stdcxx::shared_ptr<TProcessorEventHandler> eventHandler) {
- eventHandler_ = eventHandler;
- }
-
- protected:
- TProcessor() {}
-
- stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;
- };
- }
-
-
- namespace beeswax
- {
- stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;
-
-
-
- //这里只是作了一层抽象
- template <class Protocol_>
- class TDispatchProcessorT : public TProcessor
- {
-
- }
-
- template <class Protocol_>
- class BeeswaxServiceProcessorT : public ::apache::thrift::TDispatchProcessorT<Protocol_>
- {
- protected:
- ::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface_;
- BeeswaxServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface) :
- iface_(iface) {}
- }
- }
-
- namespace apache {
- namespace thrift {
- namespace protocol {
-
- class TProtocol
- {
- virtual uint32_t writeBool_virt(const bool value) = 0;
-
- virtual uint32_t writeByte_virt(const int8_t byte) = 0;
-
- virtual uint32_t writeI16_virt(const int16_t i16) = 0;
-
- virtual uint32_t writeI32_virt(const int32_t i32) = 0;
-
- }
-
- class TDummyProtocol : public TProtocol {};
- }
- }
- }
-
-
- namespace impala
- {
- template <class Protocol_>
- class ImpalaServiceProcessorT : public ::beeswax::BeeswaxServiceProcessorT<Protocol_>
- {
- protected:
- ::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface_;
- ImpalaServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface) :
- ::beeswax::BeeswaxServiceProcessorT<Protocol_>(iface),
- iface_(iface)
- }
-
- typedef ImpalaServiceProcessorT< ::apache::thrift::protocol::TDummyProtocol > ImpalaServiceProcessor;
-
- }
4 TServer
- namespace apache {
- namespace thrift {
- namespace concurrency {
- class Runnable {
- public:
- virtual ~Runnable(){};
- virtual void run() = 0;
- virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
- virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
-
- private:
- stdcxx::weak_ptr<Thread> thread_;
- };
- }
-
-
- namespace apache {
- namespace thrift {
- namespace server {
-
- class TServer : public concurrency::Runnable {
- //Contain TProcessor TServerTransport TTransportFactory TProtocolFactory
- TServer(const stdcxx::shared_ptr<TProcessor>& processor,
- const stdcxx::shared_ptr<TServerTransport>& serverTransport,
- const stdcxx::shared_ptr<TTransportFactory>& transportFactory,
- const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory)
- : processorFactory_(new TSingletonProcessorFactory(processor)),
- serverTransport_(serverTransport),
- inputTransportFactory_(transportFactory),
- outputTransportFactory_(transportFactory),
- inputProtocolFactory_(protocolFactory),
- outputProtocolFactory_(protocolFactory) {}
-
-
- public:
- virtual ~TServer() {}
- virtual void serve() = 0;
- virtual void stop() {}
- // Allows running the server as a Runnable thread
- virtual void run() { serve(); }
- }
- /**
- * In TAcceptQueueServer, the main server thread calls accept() and then immediately
- * places the returned TTransport on a queue to be processed by a separate thread,
- * asynchronously.
- *
- * This helps solve IMPALA-4135, where connections were timing out while waiting in the
- * OS accept queue, by ensuring that accept() is called as quickly as possible.
- */
- class TAcceptQueueServer : public TServer
- {
- void serve() override;
- }
-
- class TAcceptQueueServer::Task : public Runnable
- {
- void run() override
- }
- }
5 ThreadPool
- namespace apache {
- namespace thrift {
- namespace transport {
- /**
- * Generic interface for a method of transporting data. A TTransport may be
- * capable of either reading or writing, but not necessarily both.
- *
- */
- class TTransport {
- protected:
- /**
- * Simple constructor.
- */
- TTransport() {}
- public:
- virtual bool isOpen() { return false; }
- virtual void close() {
- throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
- }
-
- uint32_t read(uint8_t* buf, uint32_t len) {
- T_VIRTUAL_CALL();
- return read_virt(buf, len);
- }
-
- void write(const uint8_t* buf, uint32_t len) {
- T_VIRTUAL_CALL();
- write_virt(buf, len);
- }
- }
-
-
- namespace apache
- {
- namespace thrift
- {
- namespace server
- {
- using apache::thrift::TProcessor;
- using apache::thrift::concurrency::Monitor;
- using apache::thrift::concurrency::ThreadFactory;
- using apache::thrift::transport::TServerTransport;
- using apache::thrift::transport::TTransportFactory;
-
- struct TAcceptQueueEntry
- {
- std::shared_ptr<TTransport> client_; //Client 封装
- int64_t expiration_time_ = 0LL;
- };
- }
- }
-
-
- namespace impala
- {
- /// Simple threadpool which processes items (of type T) in parallel which were placed on a
- /// blocking queue by Offer(). Each item is processed by a single user-supplied method.
- template <typename T>
- class ThreadPool : public CacheLineAligned
- {
- public:
- /// Signature of a work-processing function. Takes the integer id of the thread which is
- /// calling it (ids run from 0 to num_threads - 1) and a reference to the item to
- /// process.
- /// Lambda 表达式 存储 所有 正在WorkFunction 正在处理的请求
- typedef boost::function<void(int thread_id, const T & workitem)> WorkFunction;
-
- /// Creates a new thread pool without starting any threads. Code must call
- /// Init() on this thread pool before any calls to Offer().
- /// -- num_threads: how many threads are part of this pool
- /// -- queue_size: the maximum size of the queue on which work items are offered. If the
- /// queue exceeds this size, subsequent calls to Offer will block until there is
- /// capacity available.
- /// -- work_function: the function to run every time an item is consumed from the queue
- /// -- fault_injection_eligible - If set to true, allow fault injection at this
- /// callsite (see thread_creation_fault_injection). If set to false, fault
- /// injection is diabled at this callsite. Thread creation sites that crash
- /// Impala or abort startup must have this set to false.
- ThreadPool(
- const std::string & group,
- const std::string & thread_prefix,
- uint32_t num_threads,
- uint32_t queue_size,
- const WorkFunction & work_function,
- bool fault_injection_eligible = false)
- : group_(group)
- , thread_prefix_(thread_prefix)
- , num_threads_(num_threads)
- , work_function_(work_function)
- , work_queue_(queue_size)
- , fault_injection_eligible_(fault_injection_eligible)
- {
-
- }
- /// Create the threads needed for this ThreadPool. Returns an error on any
- /// error spawning the threads.
- Status Init()
- {
- for (int i = 0; i < num_threads_; ++i)
- {
- std::stringstream threadname;
- threadname << thread_prefix_ << "(" << i + 1 << ":" << num_threads_ << ")";
- std::unique_ptr<Thread> t;
- Status status = Thread::Create(
- group_,
- threadname.str(),
- boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i),
- &t,
- fault_injection_eligible_);
- if (!status.ok())
- {
- // The thread pool initialization failed. Shutdown any threads that were
- // spawned. Note: Shutdown() and Join() are safe to call multiple times.
- Shutdown();
- Join();
- return status;
- }
- threads_.AddThread(std::move(t));
- }
- initialized_ = true;
- return Status::OK();
- }
- /// Blocking operation that puts a work item on the queue. If the queue is full, blocks
- /// until there is capacity available. The ThreadPool must be initialized before
- /// calling this method.
- //
- /// 'work' is copied into the work queue, but may be referenced at any time in the
- /// future. Therefore the caller needs to ensure that any data referenced by work (if T
- /// is, e.g., a pointer type) remains valid until work has been processed, and it's up to
- /// the caller to provide their own signalling mechanism to detect this (or to wait until
- /// after DrainAndShutdown returns).
- //
- /// Returns true if the work item was successfully added to the queue, false otherwise
- /// (which typically means that the thread pool has already been shut down).
- template <typename V>
- bool Offer(V && work)
- {
- DCHECK(initialized_);
- return work_queue_.BlockingPut(std::forward<V>(work));
- }
-
- /// Blocks until the work item is placed on the queue or the timeout expires. The
- /// ThreadPool must be initialized before calling this method. The same requirements
- /// about the lifetime of 'work' applies as in Offer() above. If the operation times
- /// out, the work item can be safely freed.
- ///
- /// Returns true if the work item was successfully added to the queue, false otherwise
- /// (which means the operation timed out or the thread pool has already been shut down).
- template <typename V>
- bool Offer(V && work, int64_t timeout_millis)
- {
- DCHECK(initialized_);
- int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI;
- return work_queue_.BlockingPutWithTimeout(std::forward<V>(work), timeout_micros);
- }
- private:
- /// Driver method for each thread in the pool. Continues to read work from the queue
- /// until the pool is shutdown.
-
- //工作线程
- void WorkerThread(int thread_id)
- {
- while (!IsShutdown())
- {
- T workitem;
- if (work_queue_.BlockingGet(&workitem))
- {
- work_function_(thread_id, workitem);
- }
- if (work_queue_.Size() == 0)
- {
- /// Take lock to ensure that DrainAndShutdown() cannot be between checking
- /// GetSize() and wait()'ing when the condition variable is notified.
- /// (It will hang if we notify right before calling wait().)
- std::unique_lock<std::mutex> l(lock_);
- empty_cv_.NotifyAll();
- }
- }
- }
- uint32_t num_threads_;
-
- /// User-supplied method to call to process each work item.
- WorkFunction work_function_;
- }
6 TAcceptQueueServer
- namespace apache {
- namespace thrift {
- namespace concurrency {
- class Runnable {
- public:
- virtual ~Runnable(){};
- virtual void run() = 0;
- virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
- virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
-
- private:
- stdcxx::weak_ptr<Thread> thread_;
- };
- class ThreadFactory {
- protected:
- ThreadFactory(bool detached) : detached_(detached) { }
- }
- }
-
-
-
- namespace apache {
- namespace thrift {
- namespace server {
-
- class TServer : public concurrency::Runnable {
- public:
- virtual ~TServer() {}
- virtual void serve() = 0;
- virtual void stop() {}
- // Allows running the server as a Runnable thread
- virtual void run() { serve(); }
- }
-
-
-
- }
-
- class TAcceptQueueServer : public TServer
- {
- std::shared_ptr<ThreadFactory> threadFactory_;
-
- void TAcceptQueueServer::serve()
- {
- //SetupConnection 调用
- //Thread Pool
- //WorkFunction
- ThreadPool<shared_ptr<TAcceptQueueEntry>> connection_setup_pool(
- "setup-server",
- "setup-worker",
- FLAGS_accepted_cnxn_setup_thread_pool_size,
- FLAGS_accepted_cnxn_queue_depth,
- [this](int tid, const shared_ptr<TAcceptQueueEntry> & item) { this->SetupConnection(item); });
-
- Status status = connection_setup_pool.Init();
-
- while (!stop_)
- {
- try
- {
- // Fetch client from server
- shared_ptr<TTransport> client = serverTransport_->accept();
-
- TSocket * socket = reinterpret_cast<TSocket *>(client.get());
- //new Connection
- VLOG(1) << Substitute("New connection to server $0 from client $1", name_, socket->getSocketInfo());
-
- shared_ptr<TAcceptQueueEntry> entry{new TAcceptQueueEntry};
- entry->client_ = client;
- if (queue_timeout_ms_ > 0)
- {
- entry->expiration_time_ = MonotonicMillis() + queue_timeout_ms_;
- }
-
- // New - the work done to set up the connection has been moved to SetupConnection.
- // Note that we move() entry so it's owned by SetupConnection thread.
- connection_setup_pool.Offer(std::move(entry);
- }
- }
-
- }
-
- //处理一个 Entry 即一个Client
- void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry)
- {
- //net Task
- //获取参数 成员变量
- TAcceptQueueServer::Task * task = new TAcceptQueueServer::Task(*this, processor, inputProtocol, outputProtocol, client);
- // Create a task
- shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
- // Create a thread for this task
- //这里就相当关键了 会去调用 thrift-server newThread
- //将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task TAcceptQueueServer::Task
- shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
-
- tasks_.insert(task);
- }
- }
-
-
- //跟 class TAcceptQueueServer::Task : public concurrency::Runnable 一个意思
- class TAcceptQueueServer::Task : public Runnable
- {
- public:
- Task(
- TAcceptQueueServer & server,
- shared_ptr<TProcessor> processor,
- shared_ptr<TProtocol> input,
- shared_ptr<TProtocol> output,
- shared_ptr<TTransport> transport)
- : server_(server)
- , processor_(std::move(processor))
- , input_(std::move(input))
- , output_(std::move(output))
- , transport_(std::move(transport))
- {
- }
-
-
- void run() override
- {
- //get EventHandler
- shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
- //eventHandler->CreateContext
- connectionContext = eventHandler->createContext(input_, output_);
- //event->processContext
- eventHandler->processContext(connectionContext, transport_);
-
- }
- }
- }
-
-
-
7 ThriftThreadFactory
- namespace apache {
- namespace thrift {
- namespace concurrency {
- class Runnable {
- public:
- virtual ~Runnable(){};
- virtual void run() = 0;
- virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
- virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
-
- private:
- stdcxx::weak_ptr<Thread> thread_;
- };
- class ThreadFactory {
- protected:
- ThreadFactory(bool detached) : detached_(detached) { }
- }
- }
-
-
- class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory
- {
-
- }
四 调用过程
1 ImpalaServer->TProcessor
- // Initialize the client servers.
- shared_ptr<ImpalaServer> handler = shared_from_this();
- if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
- {
- //ImpalaServiceProcessor
- shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
- }
2 ImpalaServer->TProcessorEventHandler
- // Initialize the client servers.
- shared_ptr<ImpalaServer> handler = shared_from_this();
- if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
- {
- //ImpalaServiceProcessor
- shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
- //ImpalaServiceProcessor
- shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
-
- //设置实践句柄
- beeswax_processor->setEventHandler(event_handler);
3 ThriftServer->ThriftServer
- namespace impala
- {
- /// Helper class to build new ThriftServer instances.
- class ThriftServerBuilder
- {
- public:
- ThriftServerBuilder(const std::string & name, const std::shared_ptr<apache::thrift::TProcessor> & processor, int port)
- : name_(name), processor_(processor), port_(port)
- {
- }
- std::string name_;
-
- //contain TProcess
- std::shared_ptr<apache::thrift::TProcessor> processor_;
- int port_ = 0;
- /// Constructs a new ThriftServer and puts it in 'server', if construction was
- /// successful, returns an error otherwise. In the error case, 'server' will not have
- /// been set and will not need to be freed, otherwise the caller assumes ownership of
- /// '*server'.
- Status Build(ThriftServer ** server)
- {
- std::unique_ptr<ThriftServer> ptr(new ThriftServer(
- name_,
- processor_,
- port_,
- auth_provider_,
- metrics_,
- max_concurrent_connections_,
- queue_timeout_ms_,
- idle_poll_period_ms_,
- server_transport_type_));
- if (enable_ssl_)
- {
- RETURN_IF_ERROR(ptr->EnableSsl(version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
- }
- (*server) = ptr.release();
- return Status::OK();
- }
- }
- }
-
-
- // Initialize the client servers.
- shared_ptr<ImpalaServer> handler = shared_from_this();
- if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
- {
- //ImpalaServiceProcessor
- shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
- //ImpalaServiceProcessor
- shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
-
- //设置实践句柄
- beeswax_processor->setEventHandler(event_handler);
-
- //BEESWAX_SERVER_NAME beeswax_processor beeswax_port ThriftServerBuilder
- ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
-
- if (IsExternalTlsConfigured())
- {
- LOG(INFO) << "Enabling SSL for Beeswax";
- builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
- .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
- .ssl_version(ssl_version)
- .cipher_list(FLAGS_ssl_cipher_list);
- }
-
- ThriftServer * server;
- //build server
- //这是属于啥语法
- RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
- .metrics(exec_env_->metrics())
- .max_concurrent_connections(FLAGS_fe_service_threads)
- .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
- .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
- .Build(&server));
-
- //beeswax_server_
- beeswax_server_.reset(server);
- beeswax_server_->SetConnectionHandler(this);
4 ThriftServer Start
- namespace impala
- {
- class AuthProvider;
-
- /// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by
- /// default, no enforced concurrent connection limit, that exposes the interface
- /// described by a user-supplied TProcessor object.
- ///
- /// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are
- /// private.
- /// TODO: shutdown is buggy (which only harms tests)
- class ThriftServer
- {
- friend class ThriftServerEventProcessor;
-
- /// Helper class that starts a server in a separate thread, and handles
- /// the inter-thread communication to monitor whether it started
- /// correctly.
- class ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler
- {
-
- Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer()
- {
- //Thread Create
- RETURN_IF_ERROR(Thread::Create(
- "thrift-server", name.str(), &ThriftServer::ThriftServerEventProcessor::Supervise, this, &thrift_server_->server_thread_));
- return Status::OK();
- }
-
- void ThriftServer::ThriftServerEventProcessor::Supervise()
- {
- //servce -> run
- //server_ == TAcceptQueueServer
- thrift_server_->server_->serve();
-
- //go TAcceptQueueServer->run 服务拉起
- }
- }
-
-
- /// Thrift housekeeping
- //Contaion TServer
- boost::scoped_ptr<apache::thrift::server::TServer> server_;
- /// Contain TProcessor
- std::shared_ptr<apache::thrift::TProcessor> processor_;
-
-
-
- void Start()
- {
- DCHECK(!started_);
- std::shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());
- std::shared_ptr<ThreadFactory> thread_factory(new ThriftThreadFactory("thrift-server", name_));
-
- // Note - if you change the transport types here, you must check that the
- // logic in createContext is still accurate.
- std::shared_ptr<TServerSocket> server_socket;
- std::shared_ptr<TTransportFactory> transport_factory;
- RETURN_IF_ERROR(CreateSocket(&server_socket));
- RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(transport_type_, metrics_name_, metrics_, &transport_factory));
-
- //set TServer by TAcceptQueueServer
- //且将 thread_factory .. 传递下去
- server_.reset(new TAcceptQueueServer(
- processor_,
- server_socket,
- transport_factory,
- protocol_factory,
- thread_factory,
- name_,
- max_concurrent_connections_,
- queue_timeout_ms_,
- idle_poll_period_ms_));
- if (metrics_ != NULL)
- {
- (static_cast<TAcceptQueueServer *>(server_.get()))->InitMetrics(metrics_, metrics_name_);
- }
- std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(new ThriftServer::ThriftServerEventProcessor(this));
- server_->setServerEventHandler(event_processor);
-
- RETURN_IF_ERROR(event_processor->StartAndWaitForServer());
-
- // If port_ was 0, figure out which port the server is listening on after starting.
- port_ = server_socket->getPort();
- LOG(INFO) << "ThriftServer '" << name_ << "' started on port: " << port_ << (ssl_enabled() ? "s" : "");
- DCHECK(started_);
- return Status::OK();
- }
- }
-
-
- void ImpalaServer::Start()
- {
- ....
- if (beeswax_server_.get())
- {
- RETURN_IF_ERROR(beeswax_server_->Start());
- LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
- }
- }
-
- }
5 ThreadThread ThreadFactory
- //将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task
- std::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(std::shared_ptr<atc::Runnable> runnable) const
- {
- stringstream name;
- name << prefix_ << "-" << count_.Add(1);
-
- //new ThriftThread
- std::shared_ptr<ThriftThread> result = std::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
- runnable->thread(result);
- return result;
- }
-
- //ThriftThread
- ThriftThread::ThriftThread(const string & group, const string & name, std::shared_ptr<atc::Runnable> runnable) : group_(group), name_(name)
- {
- // Sets this::runnable (and no, I don't know why it's not protected in atc::Thread)
- this->Thread::runnable(runnable);
- }
-
-
-
- thrift-server run
- void ThriftThread::start()
- {
- Promise<atc::Thread::id_t> promise;
- //thriftThread 启动时候会使用runable()
-
- Status status = impala::Thread::Create(group_, name_, bind(&ThriftThread::RunRunnable, this, runnable(), &promise), &impala_thread_);
-
- void ThriftThread::RunRunnable(std::shared_ptr<atc::Runnable> runnable, Promise<atc::Thread::id_t> * promise)
- {
- promise->Set(get_current());
- // Passing runnable in to this method (rather than reading from this->runnable())
- // ensures that it will live as long as this method, otherwise the ThriftThread could be
- // destroyed between the previous statement and this one (according to my reading of
- // PosixThread)
-
-
- //这里就回去调用 TAcceptQueueServer::Task::run()
- runnable->run();
- }
-
五 简要梳理
Main->ImapadMain->ImpalaServer->ThriftServer
ThrifServer 依赖 TServer 创建了较多的 对象(TProcess TThreadFactory)
ThriftServer/ThriftFactory->ThreadFactory->ThreadThread
最后希望能够学习Impala的同学带来一些帮助!