Impala:Impalad impala-server beeswax 调用过程及关系图

一 UML

UML

二 本文主要讲解过程

  1. main(daemain-main.cc)
  2. ImpaladMain/StatestoredMain/CatalogdMain/AdmissiondMain (impalad-main.cc)
    1. ExecEnv.init()
    2. impala_server = new ImpalaServer(exec_env)
    3. impala_server->Start((FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port, FLAGS_external_fe_port)
    4. impaa_server->Join
  • // Initialize the client servers.
  • shared_ptr<ImpalaServer> handler = shared_from_this();
  • if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
  • {
  • //ImpalaServiceProcessor
  • shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
  • //ImpalaServiceProcessor
  • shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
  • //设置实践句柄
  • beeswax_processor->setEventHandler(event_handler);
  • //BEESWAX_SERVER_NAME beeswax_processor beeswax_port ThriftServerBuilder
  • ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
  • if (IsExternalTlsConfigured())
  • {
  • LOG(INFO) << "Enabling SSL for Beeswax";
  • builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
  • .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
  • .ssl_version(ssl_version)
  • .cipher_list(FLAGS_ssl_cipher_list);
  • }
  • ThriftServer * server;
  • //build server
  • //这是属于啥语法
  • RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
  • .metrics(exec_env_->metrics())
  • .max_concurrent_connections(FLAGS_fe_service_threads)
  • .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
  • .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
  • .Build(&server));
  • //beeswax_server_
  • beeswax_server_.reset(server);
  • beeswax_server_->SetConnectionHandler(this);
  • }
  • if (beeswax_server_.get())
  • {
  • RETURN_IF_ERROR(beeswax_server_->Start());
  • LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
  • }
展开

三 基础关系篇

1 BeeswaxServiceIf 系列

  • class ImpalaServiceIf : virtual public ::beeswax::BeeswaxServiceIf
  • class ImpalaServer : public ImpalaServiceIf,
  • public ImpalaHiveServer2ServiceIf,
  • public ThriftServer::ConnectionHandlerIf,
  • public std::enable_shared_from_this<ImpalaServer>,
  • public CacheLineAligned

2 TProcessorEventHandler

  • namespace apache {
  • namespace thrift {
  • /**
  • * Virtual interface class that can handle events from the processor. To
  • * use this you should subclass it and implement the methods that you care
  • * about. Your subclass can also store local data that you may care about,
  • * such as additional "arguments" to these methods (stored in the object
  • * instance's state).
  • */
  • class TProcessorEventHandler {
  • public:
  • virtual ~TProcessorEventHandler() {}
  • /**
  • * Called before calling other callback methods.
  • * Expected to return some sort of context object.
  • * The return value is passed to all other callbacks
  • * for that function invocation.
  • */
  • virtual void* getContext(const char* fn_name, void* serverContext) {
  • (void)fn_name;
  • (void)serverContext;
  • return NULL;
  • }
  • /**
  • * Expected to free resources associated with a context.
  • */
  • virtual void freeContext(void* ctx, const char* fn_name) {
  • (void)ctx;
  • (void)fn_name;
  • }
  • /**
  • * Called before reading arguments.
  • */
  • virtual void preRead(void* ctx, const char* fn_name) {
  • (void)ctx;
  • (void)fn_name;
  • }
  • /**
  • * Called between reading arguments and calling the handler.
  • */
  • virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
  • (void)ctx;
  • (void)fn_name;
  • (void)bytes;
  • }
  • /**
  • * Called between calling the handler and writing the response.
  • */
  • virtual void preWrite(void* ctx, const char* fn_name) {
  • (void)ctx;
  • (void)fn_name;
  • }
  • /**
  • * Called after writing the response.
  • */
  • virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
  • (void)ctx;
  • (void)fn_name;
  • (void)bytes;
  • }
  • }
  • }
  • namespace impala
  • {
  • /// An RpcEventHandler is called every time an Rpc is started and completed. There is at
  • /// most one RpcEventHandler per ThriftServer. When an Rpc is started, getContext() creates
  • /// an InvocationContext recording the current time and other metadata for that invocation.
  • class RpcEventHandler : public apache::thrift::TProcessorEventHandler
  • {
  • public:
  • RpcEventHandler(const std::string & server_name, MetricGroup * metrics);
  • }
  • }
展开

3 TProcessor 系列

  • namespace apache {
  • namespace thrift {
  • /**
  • * A processor is a generic object that acts upon two streams of data, one
  • * an input and the other an output. The definition of this object is loose,
  • * though the typical case is for some sort of server that either generates
  • * responses to an input stream or forwards data from one pipe onto another.
  • *
  • */
  • class TProcessor {
  • public:
  • virtual ~TProcessor() {}
  • virtual bool process(stdcxx::shared_ptr<protocol::TProtocol> in,
  • stdcxx::shared_ptr<protocol::TProtocol> out,
  • void* connectionContext) = 0;
  • bool process(stdcxx::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) {
  • return process(io, io, connectionContext);
  • }
  • stdcxx::shared_ptr<TProcessorEventHandler> getEventHandler() const { return eventHandler_; }
  • void setEventHandler(stdcxx::shared_ptr<TProcessorEventHandler> eventHandler) {
  • eventHandler_ = eventHandler;
  • }
  • protected:
  • TProcessor() {}
  • stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;
  • };
  • }
  • namespace beeswax
  • {
  • stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;
  • //这里只是作了一层抽象
  • template <class Protocol_>
  • class TDispatchProcessorT : public TProcessor
  • {
  • }
  • template <class Protocol_>
  • class BeeswaxServiceProcessorT : public ::apache::thrift::TDispatchProcessorT<Protocol_>
  • {
  • protected:
  • ::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface_;
  • BeeswaxServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface) :
  • iface_(iface) {}
  • }
  • }
  • namespace apache {
  • namespace thrift {
  • namespace protocol {
  • class TProtocol
  • {
  • virtual uint32_t writeBool_virt(const bool value) = 0;
  • virtual uint32_t writeByte_virt(const int8_t byte) = 0;
  • virtual uint32_t writeI16_virt(const int16_t i16) = 0;
  • virtual uint32_t writeI32_virt(const int32_t i32) = 0;
  • }
  • class TDummyProtocol : public TProtocol {};
  • }
  • }
  • }
  • namespace impala
  • {
  • template <class Protocol_>
  • class ImpalaServiceProcessorT : public ::beeswax::BeeswaxServiceProcessorT<Protocol_>
  • {
  • protected:
  • ::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface_;
  • ImpalaServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface) :
  • ::beeswax::BeeswaxServiceProcessorT<Protocol_>(iface),
  • iface_(iface)
  • }
  • typedef ImpalaServiceProcessorT< ::apache::thrift::protocol::TDummyProtocol > ImpalaServiceProcessor;
  • }
展开

4 TServer

  • namespace apache {
  • namespace thrift {
  • namespace concurrency {
  • class Runnable {
  • public:
  • virtual ~Runnable(){};
  • virtual void run() = 0;
  • virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
  • virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
  • private:
  • stdcxx::weak_ptr<Thread> thread_;
  • };
  • }
  • namespace apache {
  • namespace thrift {
  • namespace server {
  • class TServer : public concurrency::Runnable {
  • //Contain TProcessor TServerTransport TTransportFactory TProtocolFactory
  • TServer(const stdcxx::shared_ptr<TProcessor>& processor,
  • const stdcxx::shared_ptr<TServerTransport>& serverTransport,
  • const stdcxx::shared_ptr<TTransportFactory>& transportFactory,
  • const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory)
  • : processorFactory_(new TSingletonProcessorFactory(processor)),
  • serverTransport_(serverTransport),
  • inputTransportFactory_(transportFactory),
  • outputTransportFactory_(transportFactory),
  • inputProtocolFactory_(protocolFactory),
  • outputProtocolFactory_(protocolFactory) {}
  • public:
  • virtual ~TServer() {}
  • virtual void serve() = 0;
  • virtual void stop() {}
  • // Allows running the server as a Runnable thread
  • virtual void run() { serve(); }
  • }
  • /**
  • * In TAcceptQueueServer, the main server thread calls accept() and then immediately
  • * places the returned TTransport on a queue to be processed by a separate thread,
  • * asynchronously.
  • *
  • * This helps solve IMPALA-4135, where connections were timing out while waiting in the
  • * OS accept queue, by ensuring that accept() is called as quickly as possible.
  • */
  • class TAcceptQueueServer : public TServer
  • {
  • void serve() override;
  • }
  • class TAcceptQueueServer::Task : public Runnable
  • {
  • void run() override
  • }
  • }
展开

5 ThreadPool

  • namespace apache {
  • namespace thrift {
  • namespace transport {
  • /**
  • * Generic interface for a method of transporting data. A TTransport may be
  • * capable of either reading or writing, but not necessarily both.
  • *
  • */
  • class TTransport {
  • protected:
  • /**
  • * Simple constructor.
  • */
  • TTransport() {}
  • public:
  • virtual bool isOpen() { return false; }
  • virtual void close() {
  • throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
  • }
  • uint32_t read(uint8_t* buf, uint32_t len) {
  • T_VIRTUAL_CALL();
  • return read_virt(buf, len);
  • }
  • void write(const uint8_t* buf, uint32_t len) {
  • T_VIRTUAL_CALL();
  • write_virt(buf, len);
  • }
  • }
  • namespace apache
  • {
  • namespace thrift
  • {
  • namespace server
  • {
  • using apache::thrift::TProcessor;
  • using apache::thrift::concurrency::Monitor;
  • using apache::thrift::concurrency::ThreadFactory;
  • using apache::thrift::transport::TServerTransport;
  • using apache::thrift::transport::TTransportFactory;
  • struct TAcceptQueueEntry
  • {
  • std::shared_ptr<TTransport> client_; //Client 封装
  • int64_t expiration_time_ = 0LL;
  • };
  • }
  • }
  • namespace impala
  • {
  • /// Simple threadpool which processes items (of type T) in parallel which were placed on a
  • /// blocking queue by Offer(). Each item is processed by a single user-supplied method.
  • template <typename T>
  • class ThreadPool : public CacheLineAligned
  • {
  • public:
  • /// Signature of a work-processing function. Takes the integer id of the thread which is
  • /// calling it (ids run from 0 to num_threads - 1) and a reference to the item to
  • /// process.
  • /// Lambda 表达式 存储 所有 正在WorkFunction 正在处理的请求
  • typedef boost::function<void(int thread_id, const T & workitem)> WorkFunction;
  • /// Creates a new thread pool without starting any threads. Code must call
  • /// Init() on this thread pool before any calls to Offer().
  • /// -- num_threads: how many threads are part of this pool
  • /// -- queue_size: the maximum size of the queue on which work items are offered. If the
  • /// queue exceeds this size, subsequent calls to Offer will block until there is
  • /// capacity available.
  • /// -- work_function: the function to run every time an item is consumed from the queue
  • /// -- fault_injection_eligible - If set to true, allow fault injection at this
  • /// callsite (see thread_creation_fault_injection). If set to false, fault
  • /// injection is diabled at this callsite. Thread creation sites that crash
  • /// Impala or abort startup must have this set to false.
  • ThreadPool(
  • const std::string & group,
  • const std::string & thread_prefix,
  • uint32_t num_threads,
  • uint32_t queue_size,
  • const WorkFunction & work_function,
  • bool fault_injection_eligible = false)
  • : group_(group)
  • , thread_prefix_(thread_prefix)
  • , num_threads_(num_threads)
  • , work_function_(work_function)
  • , work_queue_(queue_size)
  • , fault_injection_eligible_(fault_injection_eligible)
  • {
  • }
  • /// Create the threads needed for this ThreadPool. Returns an error on any
  • /// error spawning the threads.
  • Status Init()
  • {
  • for (int i = 0; i < num_threads_; ++i)
  • {
  • std::stringstream threadname;
  • threadname << thread_prefix_ << "(" << i + 1 << ":" << num_threads_ << ")";
  • std::unique_ptr<Thread> t;
  • Status status = Thread::Create(
  • group_,
  • threadname.str(),
  • boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i),
  • &t,
  • fault_injection_eligible_);
  • if (!status.ok())
  • {
  • // The thread pool initialization failed. Shutdown any threads that were
  • // spawned. Note: Shutdown() and Join() are safe to call multiple times.
  • Shutdown();
  • Join();
  • return status;
  • }
  • threads_.AddThread(std::move(t));
  • }
  • initialized_ = true;
  • return Status::OK();
  • }
  • /// Blocking operation that puts a work item on the queue. If the queue is full, blocks
  • /// until there is capacity available. The ThreadPool must be initialized before
  • /// calling this method.
  • //
  • /// 'work' is copied into the work queue, but may be referenced at any time in the
  • /// future. Therefore the caller needs to ensure that any data referenced by work (if T
  • /// is, e.g., a pointer type) remains valid until work has been processed, and it's up to
  • /// the caller to provide their own signalling mechanism to detect this (or to wait until
  • /// after DrainAndShutdown returns).
  • //
  • /// Returns true if the work item was successfully added to the queue, false otherwise
  • /// (which typically means that the thread pool has already been shut down).
  • template <typename V>
  • bool Offer(V && work)
  • {
  • DCHECK(initialized_);
  • return work_queue_.BlockingPut(std::forward<V>(work));
  • }
  • /// Blocks until the work item is placed on the queue or the timeout expires. The
  • /// ThreadPool must be initialized before calling this method. The same requirements
  • /// about the lifetime of 'work' applies as in Offer() above. If the operation times
  • /// out, the work item can be safely freed.
  • ///
  • /// Returns true if the work item was successfully added to the queue, false otherwise
  • /// (which means the operation timed out or the thread pool has already been shut down).
  • template <typename V>
  • bool Offer(V && work, int64_t timeout_millis)
  • {
  • DCHECK(initialized_);
  • int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI;
  • return work_queue_.BlockingPutWithTimeout(std::forward<V>(work), timeout_micros);
  • }
  • private:
  • /// Driver method for each thread in the pool. Continues to read work from the queue
  • /// until the pool is shutdown.
  • //工作线程
  • void WorkerThread(int thread_id)
  • {
  • while (!IsShutdown())
  • {
  • T workitem;
  • if (work_queue_.BlockingGet(&workitem))
  • {
  • work_function_(thread_id, workitem);
  • }
  • if (work_queue_.Size() == 0)
  • {
  • /// Take lock to ensure that DrainAndShutdown() cannot be between checking
  • /// GetSize() and wait()'ing when the condition variable is notified.
  • /// (It will hang if we notify right before calling wait().)
  • std::unique_lock<std::mutex> l(lock_);
  • empty_cv_.NotifyAll();
  • }
  • }
  • }
  • uint32_t num_threads_;
  • /// User-supplied method to call to process each work item.
  • WorkFunction work_function_;
  • }
展开

6 TAcceptQueueServer

  • namespace apache {
  • namespace thrift {
  • namespace concurrency {
  • class Runnable {
  • public:
  • virtual ~Runnable(){};
  • virtual void run() = 0;
  • virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
  • virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
  • private:
  • stdcxx::weak_ptr<Thread> thread_;
  • };
  • class ThreadFactory {
  • protected:
  • ThreadFactory(bool detached) : detached_(detached) { }
  • }
  • }
  • namespace apache {
  • namespace thrift {
  • namespace server {
  • class TServer : public concurrency::Runnable {
  • public:
  • virtual ~TServer() {}
  • virtual void serve() = 0;
  • virtual void stop() {}
  • // Allows running the server as a Runnable thread
  • virtual void run() { serve(); }
  • }
  • }
  • class TAcceptQueueServer : public TServer
  • {
  • std::shared_ptr<ThreadFactory> threadFactory_;
  • void TAcceptQueueServer::serve()
  • {
  • //SetupConnection 调用
  • //Thread Pool
  • //WorkFunction
  • ThreadPool<shared_ptr<TAcceptQueueEntry>> connection_setup_pool(
  • "setup-server",
  • "setup-worker",
  • FLAGS_accepted_cnxn_setup_thread_pool_size,
  • FLAGS_accepted_cnxn_queue_depth,
  • [this](int tid, const shared_ptr<TAcceptQueueEntry> & item) { this->SetupConnection(item); });
  • Status status = connection_setup_pool.Init();
  • while (!stop_)
  • {
  • try
  • {
  • // Fetch client from server
  • shared_ptr<TTransport> client = serverTransport_->accept();
  • TSocket * socket = reinterpret_cast<TSocket *>(client.get());
  • //new Connection
  • VLOG(1) << Substitute("New connection to server $0 from client $1", name_, socket->getSocketInfo());
  • shared_ptr<TAcceptQueueEntry> entry{new TAcceptQueueEntry};
  • entry->client_ = client;
  • if (queue_timeout_ms_ > 0)
  • {
  • entry->expiration_time_ = MonotonicMillis() + queue_timeout_ms_;
  • }
  • // New - the work done to set up the connection has been moved to SetupConnection.
  • // Note that we move() entry so it's owned by SetupConnection thread.
  • connection_setup_pool.Offer(std::move(entry);
  • }
  • }
  • }
  • //处理一个 Entry 即一个Client
  • void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry)
  • {
  • //net Task
  • //获取参数 成员变量
  • TAcceptQueueServer::Task * task = new TAcceptQueueServer::Task(*this, processor, inputProtocol, outputProtocol, client);
  • // Create a task
  • shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
  • // Create a thread for this task
  • //这里就相当关键了 会去调用 thrift-server newThread
  • //将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task TAcceptQueueServer::Task
  • shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
  • tasks_.insert(task);
  • }
  • }
  • //跟 class TAcceptQueueServer::Task : public concurrency::Runnable 一个意思
  • class TAcceptQueueServer::Task : public Runnable
  • {
  • public:
  • Task(
  • TAcceptQueueServer & server,
  • shared_ptr<TProcessor> processor,
  • shared_ptr<TProtocol> input,
  • shared_ptr<TProtocol> output,
  • shared_ptr<TTransport> transport)
  • : server_(server)
  • , processor_(std::move(processor))
  • , input_(std::move(input))
  • , output_(std::move(output))
  • , transport_(std::move(transport))
  • {
  • }
  • void run() override
  • {
  • //get EventHandler
  • shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
  • //eventHandler->CreateContext
  • connectionContext = eventHandler->createContext(input_, output_);
  • //event->processContext
  • eventHandler->processContext(connectionContext, transport_);
  • }
  • }
  • }
展开

7 ThriftThreadFactory

  • namespace apache {
  • namespace thrift {
  • namespace concurrency {
  • class Runnable {
  • public:
  • virtual ~Runnable(){};
  • virtual void run() = 0;
  • virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
  • virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
  • private:
  • stdcxx::weak_ptr<Thread> thread_;
  • };
  • class ThreadFactory {
  • protected:
  • ThreadFactory(bool detached) : detached_(detached) { }
  • }
  • }
  • class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory
  • {
  • }

四 调用过程

1 ImpalaServer->TProcessor

  • // Initialize the client servers.
  • shared_ptr<ImpalaServer> handler = shared_from_this();
  • if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
  • {
  • //ImpalaServiceProcessor
  • shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
  • }

2 ImpalaServer->TProcessorEventHandler

  • // Initialize the client servers.
  • shared_ptr<ImpalaServer> handler = shared_from_this();
  • if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
  • {
  • //ImpalaServiceProcessor
  • shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
  • //ImpalaServiceProcessor
  • shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
  • //设置实践句柄
  • beeswax_processor->setEventHandler(event_handler);

3 ThriftServer->ThriftServer

  • namespace impala
  • {
  • /// Helper class to build new ThriftServer instances.
  • class ThriftServerBuilder
  • {
  • public:
  • ThriftServerBuilder(const std::string & name, const std::shared_ptr<apache::thrift::TProcessor> & processor, int port)
  • : name_(name), processor_(processor), port_(port)
  • {
  • }
  • std::string name_;
  • //contain TProcess
  • std::shared_ptr<apache::thrift::TProcessor> processor_;
  • int port_ = 0;
  • /// Constructs a new ThriftServer and puts it in 'server', if construction was
  • /// successful, returns an error otherwise. In the error case, 'server' will not have
  • /// been set and will not need to be freed, otherwise the caller assumes ownership of
  • /// '*server'.
  • Status Build(ThriftServer ** server)
  • {
  • std::unique_ptr<ThriftServer> ptr(new ThriftServer(
  • name_,
  • processor_,
  • port_,
  • auth_provider_,
  • metrics_,
  • max_concurrent_connections_,
  • queue_timeout_ms_,
  • idle_poll_period_ms_,
  • server_transport_type_));
  • if (enable_ssl_)
  • {
  • RETURN_IF_ERROR(ptr->EnableSsl(version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
  • }
  • (*server) = ptr.release();
  • return Status::OK();
  • }
  • }
  • }
  • // Initialize the client servers.
  • shared_ptr<ImpalaServer> handler = shared_from_this();
  • if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
  • {
  • //ImpalaServiceProcessor
  • shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
  • //ImpalaServiceProcessor
  • shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
  • //设置实践句柄
  • beeswax_processor->setEventHandler(event_handler);
  • //BEESWAX_SERVER_NAME beeswax_processor beeswax_port ThriftServerBuilder
  • ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
  • if (IsExternalTlsConfigured())
  • {
  • LOG(INFO) << "Enabling SSL for Beeswax";
  • builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
  • .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
  • .ssl_version(ssl_version)
  • .cipher_list(FLAGS_ssl_cipher_list);
  • }
  • ThriftServer * server;
  • //build server
  • //这是属于啥语法
  • RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
  • .metrics(exec_env_->metrics())
  • .max_concurrent_connections(FLAGS_fe_service_threads)
  • .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
  • .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
  • .Build(&server));
  • //beeswax_server_
  • beeswax_server_.reset(server);
  • beeswax_server_->SetConnectionHandler(this);
展开

4 ThriftServer Start

  • namespace impala
  • {
  • class AuthProvider;
  • /// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by
  • /// default, no enforced concurrent connection limit, that exposes the interface
  • /// described by a user-supplied TProcessor object.
  • ///
  • /// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are
  • /// private.
  • /// TODO: shutdown is buggy (which only harms tests)
  • class ThriftServer
  • {
  • friend class ThriftServerEventProcessor;
  • /// Helper class that starts a server in a separate thread, and handles
  • /// the inter-thread communication to monitor whether it started
  • /// correctly.
  • class ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler
  • {
  • Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer()
  • {
  • //Thread Create
  • RETURN_IF_ERROR(Thread::Create(
  • "thrift-server", name.str(), &ThriftServer::ThriftServerEventProcessor::Supervise, this, &thrift_server_->server_thread_));
  • return Status::OK();
  • }
  • void ThriftServer::ThriftServerEventProcessor::Supervise()
  • {
  • //servce -> run
  • //server_ == TAcceptQueueServer
  • thrift_server_->server_->serve();
  • //go TAcceptQueueServer->run 服务拉起
  • }
  • }
  • /// Thrift housekeeping
  • //Contaion TServer
  • boost::scoped_ptr<apache::thrift::server::TServer> server_;
  • /// Contain TProcessor
  • std::shared_ptr<apache::thrift::TProcessor> processor_;
  • void Start()
  • {
  • DCHECK(!started_);
  • std::shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());
  • std::shared_ptr<ThreadFactory> thread_factory(new ThriftThreadFactory("thrift-server", name_));
  • // Note - if you change the transport types here, you must check that the
  • // logic in createContext is still accurate.
  • std::shared_ptr<TServerSocket> server_socket;
  • std::shared_ptr<TTransportFactory> transport_factory;
  • RETURN_IF_ERROR(CreateSocket(&server_socket));
  • RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(transport_type_, metrics_name_, metrics_, &transport_factory));
  • //set TServer by TAcceptQueueServer
  • //且将 thread_factory .. 传递下去
  • server_.reset(new TAcceptQueueServer(
  • processor_,
  • server_socket,
  • transport_factory,
  • protocol_factory,
  • thread_factory,
  • name_,
  • max_concurrent_connections_,
  • queue_timeout_ms_,
  • idle_poll_period_ms_));
  • if (metrics_ != NULL)
  • {
  • (static_cast<TAcceptQueueServer *>(server_.get()))->InitMetrics(metrics_, metrics_name_);
  • }
  • std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(new ThriftServer::ThriftServerEventProcessor(this));
  • server_->setServerEventHandler(event_processor);
  • RETURN_IF_ERROR(event_processor->StartAndWaitForServer());
  • // If port_ was 0, figure out which port the server is listening on after starting.
  • port_ = server_socket->getPort();
  • LOG(INFO) << "ThriftServer '" << name_ << "' started on port: " << port_ << (ssl_enabled() ? "s" : "");
  • DCHECK(started_);
  • return Status::OK();
  • }
  • }
  • void ImpalaServer::Start()
  • {
  • ....
  • if (beeswax_server_.get())
  • {
  • RETURN_IF_ERROR(beeswax_server_->Start());
  • LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
  • }
  • }
  • }
展开

5 ThreadThread ThreadFactory

  • //将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task
  • std::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(std::shared_ptr<atc::Runnable> runnable) const
  • {
  • stringstream name;
  • name << prefix_ << "-" << count_.Add(1);
  • //new ThriftThread
  • std::shared_ptr<ThriftThread> result = std::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
  • runnable->thread(result);
  • return result;
  • }
  • //ThriftThread
  • ThriftThread::ThriftThread(const string & group, const string & name, std::shared_ptr<atc::Runnable> runnable) : group_(group), name_(name)
  • {
  • // Sets this::runnable (and no, I don't know why it's not protected in atc::Thread)
  • this->Thread::runnable(runnable);
  • }
  • thrift-server run
  • void ThriftThread::start()
  • {
  • Promise<atc::Thread::id_t> promise;
  • //thriftThread 启动时候会使用runable()
  • Status status = impala::Thread::Create(group_, name_, bind(&ThriftThread::RunRunnable, this, runnable(), &promise), &impala_thread_);
  • void ThriftThread::RunRunnable(std::shared_ptr<atc::Runnable> runnable, Promise<atc::Thread::id_t> * promise)
  • {
  • promise->Set(get_current());
  • // Passing runnable in to this method (rather than reading from this->runnable())
  • // ensures that it will live as long as this method, otherwise the ThriftThread could be
  • // destroyed between the previous statement and this one (according to my reading of
  • // PosixThread)
  • //这里就回去调用 TAcceptQueueServer::Task::run()
  • runnable->run();
  • }
展开

五 简要梳理

Main->ImapadMain->ImpalaServer->ThriftServer

ThrifServer 依赖 TServer 创建了较多的 对象(TProcess TThreadFactory)

ThriftServer/ThriftFactory->ThreadFactory->ThreadThread

最后希望能够学习Impala的同学带来一些帮助!

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
<<上一篇
下一篇>>