Skip to content

Latest commit

 

History

History
698 lines (598 loc) · 26.9 KB

eos源码分析之三交易.md

File metadata and controls

698 lines (598 loc) · 26.9 KB

eos源码分析之三交易

一、交易的介绍


说明:最新的1.0及以上代码中已经移除了相关的cycle和shared部分


基本上区块链都是这几板斧,又轮到交易了。一般交易都是最复杂的部分,因为它涉及到网络,帐户,数据库,共识,内存池等等好多部分。EOS的交易数据结构主要有两种:

1、signed_transaction:用户发起的交易


2、deferred_transaction:延期交易,注释的说明这种交易可以返回错误给当事人。


EOS为了应对海量的交易,引入分片(shard)技术,即在区块中维护了一条私有区块链,将一个block分割成多个cycle(循环),每个cycle的生成时间很短,而且不用等待完整的block确认完成(3秒),生成后直接异步广播发送,这样,交易在很快就被确认了。在一个cycle中,如果存在有大量互不想干的交易,但么多核多线程技术将极大的提高交易的处理速度 。 这次仍然从客户端发起一笔交易开始来查看整个代码的流程,基本上是cleos发出Transaction到nodeos的HTTP接口,接口接收到消息,分析打包。

二、交易的具体过程

1、客户端发起交易:
无论是send_actions 还是 send_transaction最终都落到push_transaction

fc::variant push_transaction( signed_transaction& trx, int32_t extra_kcpu = 1000, packed_transaction::compression_type compression = packed_transaction::none ) {
   auto info = get_info();
   trx.expiration = info.head_block_time + tx_expiration;
   trx.set_reference_block(info.head_block_id);

   if (tx_force_unique) {
      trx.context_free_actions.emplace_back( generate_nonce() );
   }

   auto required_keys = determine_required_keys(trx);
   size_t num_keys = required_keys.is_array() ? required_keys.get_array().size() : 1;

   trx.max_kcpu_usage = (tx_max_cpu_usage + 1023)/1024;
   trx.max_net_usage_words = (tx_max_net_usage + 7)/8;

   if (!tx_skip_sign) {
     //生成交易
      sign_transaction(trx, required_keys);
   }

   if (!tx_dont_broadcast) {
     //调用HTTP接口,packed_transaction打包交易
      return call(push_txn_func, packed_transaction(trx, compression));
   } else {
      return fc::variant(trx);
   }
}
void sign_transaction(signed_transaction& trx, fc::variant& required_keys) {
   // TODO determine chain id
   fc::variants sign_args = {fc::variant(trx), required_keys, fc::variant(chain_id_type{})};
   const auto& signed_trx = call(wallet_host, wallet_port, wallet_sign_trx, sign_args);
   trx = signed_trx.as<signed_transaction>();
}

看到调用call,那么进去:
template<typename T>
fc::variant call( const std::string& server, uint16_t port,
                  const std::string& path,
                  const T& v ) { return eosio::client::http::call( server, port, path, fc::variant(v) ); }

template<typename T>
fc::variant call( const std::string& path,
                  const T& v ) { return eosio::client::http::call( host, port, path, fc::variant(v) ); }
//最终调用
fc::variant call( const std::string& server, uint16_t port,
                  const std::string& path,
                  const fc::variant& postdata ) {
try {
......

   while( endpoint_iterator != end ) {
      // Try each endpoint until we successfully establish a connection.
      tcp::socket socket(io_service);
      try {
         boost::asio::connect(socket, endpoint_iterator);
         endpoint_iterator = end;
      }
      ......

      // Form the request. We specify the "Connection: close" header so that the
      // server will close the socket after transmitting the response. This will
      // allow us to treat all data up until the EOF as the content.
      //组建请求的流
      boost::asio::streambuf request;
      std::ostream request_stream(&request);
      request_stream << "POST " << path << " HTTP/1.0\r\n";
      request_stream << "Host: " << server << "\r\n";
      request_stream << "content-length: " << postjson.size() << "\r\n";
      request_stream << "Accept: */*\r\n";
      request_stream << "Connection: close\r\n\r\n";
      request_stream << postjson;

      // Send the request.发送组织好的Request
      boost::asio::write(socket, request);

      // Read the response status line. The response streambuf will automatically
      // grow to accommodate the entire line. The growth may be limited by passing
      // a maximum size to the streambuf constructor.
      //读取并处理Response
      boost::asio::streambuf response;
      boost::asio::read_until(socket, response, "\r\n");

      // Check that response is OK.判断格式
      std::istream response_stream(&response);
      std::string http_version;
      response_stream >> http_version;
      unsigned int status_code;
      response_stream >> status_code;
      std::string status_message;
      std::getline(response_stream, status_message);
      FC_ASSERT( !(!response_stream || http_version.substr(0, 5) != "HTTP/"), "Invalid Response" );

      // Read the response headers, which are terminated by a blank line.
      boost::asio::read_until(socket, response, "\r\n\r\n");

      // Process the response headers.读取头
      std::string header;
      while (std::getline(response_stream, header) && header != "\r")
      {
      //      std::cout << header << "\n";
      }
      //      std::cout << "\n";

      std::stringstream re;
      // Write whatever content we already have to output.
      if (response.size() > 0)
         //   std::cout << &response;
         re << &response;

      // Read until EOF, writing data to output as we go.读取数据
      boost::system::error_code error;
      while (boost::asio::read(socket, response,
                               boost::asio::transfer_at_least(1), error))
         re << &response;

......
   }

   FC_ASSERT( !"unable to connect" );
} FC_CAPTURE_AND_RETHROW()  // error, "Request Path: ${server}:${port}${path}\nRequest Post Data: ${postdata}" ,
                        // ("server", server)("port", port)("path", path)("postdata", postdata) )
}

在HTTP的名空间里定义了一大堆的API接口,这里用的是:
const string chain_func_base = "/v1/chain";
const string push_txns_func = chain_func_base + "/push_transactions";(httpc.hpp中)
这里基本就相当于调用HTTP的接口了,也就是说cleos把相关的API调用及内容发到了服务端,也就是nodeos端。那就去服务端看看nodeos接收到请求后干了些什么,有什么具体的动作。上面使用的路径是chain, 所以到chain_api_plugin.cpp中去看。
在前面的整体NODEOS启动时,会有插件启动这个函数执行:
//最终调用下面这个宏,这个宏展开稍微有一些复杂,其实就是名字替换和参数转换   类似struct get_account_params {name account_name;};
#define CALL(api_name, api_handle, api_namespace, call_name, http_response_code) \
{std::string("/v1/" #api_name "/" #call_name), \
   [this, api_handle](string, string body, url_response_callback cb) mutable { \
          try { \
             if (body.empty()) body = "{}"; \
             auto result = api_handle.call_name(fc::json::from_string(body).as<api_namespace::call_name ## \_params>()); \
             cb(http_response_code, fc::json::to_string(result)); \
          } \
          ......//去除异常
       }}
//用下面两个宏来展开增加的API
#define CHAIN_RO_CALL(call_name, http_response_code) CALL(chain, ro_api, chain_apis::read_only, call_name, http_response_code)
#define CHAIN_RW_CALL(call_name, http_response_code) CALL(chain, rw_api, chain_apis::read_write, call_name, http_response_code)
void chain_api_plugin::plugin_startup() {
   ilog( "starting chain_api_plugin" );
   my.reset(new chain_api_plugin_impl(app().get_plugin<chain_plugin>().chain()));
   auto ro_api = app().get_plugin<chain_plugin>().get_read_only_api();
   auto rw_api = app().get_plugin<chain_plugin>().get_read_write_api();
//注册相关API
   app().get_plugin<http_plugin>().add_api({
      CHAIN_RO_CALL(get_info, 200),
      CHAIN_RO_CALL(get_block, 200),
      CHAIN_RO_CALL(get_account, 200),
      CHAIN_RO_CALL(get_code, 200),
      CHAIN_RO_CALL(get_table_rows, 200),
      CHAIN_RO_CALL(get_currency_balance, 200),
      CHAIN_RO_CALL(get_currency_stats, 200),
      CHAIN_RO_CALL(abi_json_to_bin, 200),
      CHAIN_RO_CALL(abi_bin_to_json, 200),
      CHAIN_RO_CALL(get_required_keys, 200),
      CHAIN_RW_CALL(push_block, 202),
      CHAIN_RW_CALL(push_transaction, 202),
      CHAIN_RW_CALL(push_transactions, 202)
   });
}

上面的api_handle.call_name展开后是:read_write::push_transaction_results read_write::push_transaction(),在chain_plugin.cpp文件中。
read_write::push_transaction_results read_write::push_transaction(const read_write::push_transaction_params& params) {
   packed_transaction pretty_input;
   auto resolver = make_resolver(this);
   try {
      abi_serializer::from_variant(params, pretty_input, resolver);
   } EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction")

   auto result = db.push_transaction(pretty_input, skip_flags);//这行是根本,db定义为chain_controller
#warning TODO: get transaction results asynchronously
   fc::variant pretty_output;
   abi_serializer::to_variant(result, pretty_output, resolver);
   return read_write::push_transaction_results{ result.id, pretty_output };
}

因为这个函数其实是调用的chain_controller的同名函数:
/**
 * Attempts to push the transaction into the pending queue
 *
 * When called to push a locally generated transaction, set the skip_block_size_check bit on the skip argument. This
 * will allow the transaction to be pushed even if it causes the pending block size to exceed the maximum block size.
 * Although the transaction will probably not propagate further now, as the peers are likely to have their pending
 * queues full as well, it will be kept in the queue to be propagated later when a new block flushes out the pending
 * queues.
 */
transaction_trace chain_controller::push_transaction(const packed_transaction& trx, uint32_t skip)
{ try {
   // If this is the first transaction pushed after applying a block, start a new undo session.
   // This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
   if( !\_pending_block ) {
      _start_pending_block();
   }

   return with_skip_flags(skip, [&]() {
      return \_db.with_write_lock([&]() {
         return _push_transaction(trx);
      });
   });
} EOS_CAPTURE_AND_RETHROW( transaction_exception ) }

看注释说得很清楚了,
如果是交易写入块后的第一个交易,是启动一个可撤销的Session,保证在新块到来时可以进行回滚:
void chain_controller::_start_pending_block( bool skip_deferred )
{
  //配置等待块
   FC_ASSERT( !\_pending_block );
   \_pending_block         = signed_block();
   \_pending_block_trace   = block_trace(\*\_pending_block);
   \_pending_block_session = \_db.start_undo_session(true);
  \_pending_block->regions.resize(1);
   \_pending_block_trace->region_traces.resize(1);

   _start_pending_cycle();//处理里块的cycle
   _apply_on_block_transaction();
   _finalize_pending_cycle();

   _start_pending_cycle();

   if ( !skip_deferred ) {
      _push_deferred_transactions( false );
      if (\_pending_cycle_trace && \_pending_cycle_trace->shard_traces.size() > 0 && \_pending_cycle_trace->shard_traces.back().transaction_traces.size() > 0) {
         _finalize_pending_cycle();
         _start_pending_cycle();
      }
   }
}
//下面的两个函数比较关键,一个是处理cycle,一个是添加交易,下面的英文注释说得也比较清楚
/**
 *  Wraps up all work for current shards, starts a new cycle, and
 *  executes any pending transactions
 */
void chain_controller::_start_pending_cycle() {
   // only add a new cycle if there are no cycles or if the previous cycle isn't empty
   if (\_pending_block->regions.back().cycles_summary.empty() ||
       (!\_pending_block->regions.back().cycles_summary.back().empty() &&
        !\_pending_block->regions.back().cycles_summary.back().back().empty()))
      \_pending_block->regions.back().cycles_summary.resize( \_pending_block->regions[0].cycles_summary.size() + 1 );


   \_pending_cycle_trace = cycle_trace();

   \_pending_cycle_trace->shard_traces.resize(\_pending_cycle_trace->shard_traces.size() + 1 );//当前分片

   auto& bcycle = \_pending_block->regions.back().cycles_summary.back();
   if(bcycle.empty() || !bcycle.back().empty())
      bcycle.resize( bcycle.size()+1 );
}
void chain_controller::_apply_on_block_transaction()
{
   \_pending_block_trace->implicit_transactions.emplace_back(_get_on_block_transaction());
   transaction_metadata mtrx(packed_transaction(\_pending_block_trace->implicit_transactions.back()), get_chain_id(), head_block_time(), optional<time_point>(), true /*is implicit*/);
   _push_transaction(std::move(mtrx));
}
//再处理一下deferred
vector<transaction_trace> chain_controller::_push_deferred_transactions( bool flush )
{
   FC_ASSERT( \_pending_block, " block not started" );

   if (flush && \_pending_cycle_trace && \_pending_cycle_trace->shard_traces.size() > 0) {
      // TODO: when we go multithreaded this will need a better way to see if there are flushable
      // deferred transactions in the shards
      auto maybe_start_new_cycle = [&]() {
         for (const auto &st: \_pending_cycle_trace->shard_traces) {
            for (const auto &tr: st.transaction_traces) {
               for (const auto &req: tr.deferred_transaction_requests) {
                  if ( req.contains<deferred_transaction>() ) {
                     const auto& dt = req.get<deferred_transaction>();
                     if ( fc::time_point(dt.execute_after) <= head_block_time() ) {
                        // force a new cycle and break out
                        _finalize_pending_cycle();
                        _start_pending_cycle();
                        return;
                     }
                  }
               }
            }
         }
      };

      maybe_start_new_cycle();
   }
 }

这里得重点看看下面这个函数:
void chain_controller::_finalize_pending_cycle()
{
   // prune empty shard
   if (!\_pending_block->regions.back().cycles_summary.empty() &&
       !\_pending_block->regions.back().cycles_summary.back().empty() &&
       \_pending_block->regions.back().cycles_summary.back().back().empty()) {
      \_pending_block->regions.back().cycles_summary.back().resize( \_pending_block->regions.back().cycles_summary.back().size() - 1 );
      \_pending_cycle_trace->shard_traces.resize(\_pending_cycle_trace->shard_traces.size() - 1 );
   }
   // prune empty cycle
   if (!\_pending_block->regions.back().cycles_summary.empty() &&
       \_pending_block->regions.back().cycles_summary.back().empty()) {
      \_pending_block->regions.back().cycles_summary.resize( \_pending_block->regions.back().cycles_summary.size() - 1 );
      \_pending_cycle_trace.reset();
      return;
   }

   for( int idx = 0; idx < \_pending_cycle_trace->shard_traces.size(); idx++ ) {
      auto& trace = \_pending_cycle_trace->shard_traces.at(idx);
      auto& shard = \_pending_block->regions.back().cycles_summary.back().at(idx);

      trace.finalize_shard();
      shard.read_locks.reserve(trace.read_locks.size());
      shard.read_locks.insert(shard.read_locks.end(), trace.read_locks.begin(), trace.read_locks.end());

      shard.write_locks.reserve(trace.write_locks.size());
      shard.write_locks.insert(shard.write_locks.end(), trace.write_locks.begin(), trace.write_locks.end());
   }

   _apply_cycle_trace(*\_pending_cycle_trace);
   \_pending_block_trace->region_traces.back().cycle_traces.emplace_back(std::move(*\_pending_cycle_trace));
   \_pending_cycle_trace.reset();
}

这里遇到的问题是,没有找到Cycle的周期性增加,对块内的分片也因此不是非常清楚。
现在接着回到交易,看前面调用了_push_transaction, 它有两个重载,前面的重载会在函数内调用后面的重载函数,即:

transaction_trace chain_controller::_push_transaction(const packed_transaction& packed_trx)
{ try {
......

   // 根据情况来分别打包普通交易和延迟交易
   if( mtrx.delay.count() == 0 ) {
      result = _push_transaction( std::move(mtrx) );
   } else {

      result = wrap_transaction_processing( std::move(mtrx),
                                            [this](transaction_metadata& meta) { return delayed_transaction_processing(meta); } );
   }

   // notify anyone listening to pending transactions
   //这个最终会调用connections的enqueue-queue_write-do_queue_write,然后发送广播消息
   on_pending_transaction(\_pending_transaction_metas.back(), packed_trx);

   \_pending_block->input_transactions.emplace_back(packed_trx);//插入到区块中
......

} FC_CAPTURE_AND_RETHROW( (transaction_header(packed_trx.get_transaction())) ) }

transaction_trace chain_controller::_push_transaction( transaction_metadata&& data )
{ try {
   auto process_apply_transaction = [this](transaction_metadata& meta) {
......
      /// TODO: move \_pending_cycle into db so that it can be undone if transation fails, for now we will apply
      /// the transaction first so that there is nothing to undo... this only works because things are currently
      /// single threaded
      // set cycle, shard, region etc
      meta.region_id = 0;
      meta.cycle_index = cyclenum;
      meta.shard_index = 0;
      return _apply_transaction( meta );//交易打入块中
   };
 //  wdump((transaction_header(data.trx())));
   return wrap_transaction_processing( move(data), process_apply_transaction );
} FC_CAPTURE_AND_RETHROW( ) }

经过上面的处理之后,最后通过_apply_transaction把交易最终打入块中:
//写入并执行交易
transaction_trace chain_controller::__apply_transaction( transaction_metadata& meta )
{ try {
   transaction_trace result(meta.id);

   for (const auto &act : meta.trx().context_free_actions) {
      apply_context context(\*this, \_db, act, meta);
      context.context_free = true;
      context.exec();//执行
      fc::move_append(result.action_traces, std::move(context.results.applied_actions));
      FC_ASSERT( result.deferred_transaction_requests.size() == 0 );
   }

   for (const auto &act : meta.trx().actions) {
      apply_context context(\*this, \_db, act, meta);
      context.exec();
      context.results.applied_actions.back().auths_used = act.authorization.size() - context.unused_authorizations().size();
      fc::move_append(result.action_traces, std::move(context.results.applied_actions));
      fc::move_append(result.deferred_transaction_requests, std::move(context.results.deferred_transaction_requests));
   }

   update_resource_usage(result, meta);

   update_permission_usage(meta);
   record_transaction(meta.trx());//保存到数据库
   return result;
} FC_CAPTURE_AND_RETHROW() }

transaction_trace chain_controller::_apply_transaction( transaction_metadata& meta ) { try {
   auto execute = [this](transaction_metadata& meta) -> transaction_trace {
      try {
         auto temp_session = \_db.start_undo_session(true);
         auto result =  __apply_transaction(meta);
......
      } catch (...) {
......
      }
   };

......
} FC_CAPTURE_AND_RETHROW( (transaction_header(meta.trx())) ) }

交易完成后,就需要打包到块并进行广播了.这里只简单说一下,在介绍区块和共识时再详细说明:
在producer_plugin插件中:
void producer_plugin::plugin_startup()
{ try {
   ilog("producer plugin:  plugin_startup() begin");
   chain::chain_controller& chain = app().get_plugin<chain_plugin>().chain();

   if (!my->_producers.empty())
   {
......
      my->schedule_production_loop();
   } else
......
   } FC_CAPTURE_AND_RETHROW() }


void producer_plugin_impl::schedule_production_loop() {
   //Schedule for the next second's tick regardless of chain state
   // If we would wait less than 50ms (1/10 of block_interval), wait for the whole block interval.
......

   //\_timer.expires_from_now(boost::posix_time::microseconds(time_to_next_block_time));
  \_timer.expires_from_now( boost::posix_time::microseconds(time_to_next_block_time) );
   //\_timer.async_wait(boost::bind(&producer_plugin_impl::block_production_loop, this));
   \_timer.async_wait( [&](const boost::system::error_code&){ block_production_loop(); } );
}

block_production_condition::block_production_condition_enum producer_plugin_impl::block_production_loop() {
   block_production_condition::block_production_condition_enum result;
   fc::mutable_variant_object capture;
   try
   {
      result = maybe_produce_block(capture);//生产块
   }
   catch( const fc::canceled_exception& )
   {
    ......
   }

   if(result != block_production_condition::produced && result == \_prev_result) {
      \_prev_result_count++;
   }
   else {
      \_prev_result_count = 1;
      \_prev_result = result;
      switch(result)
         {
         case block_production_condition::produced: {
            const auto& db = app().get_plugin<chain_plugin>().chain();
            auto producer  = db.head_block_producer();
......
            break;
         }
......
         }
   }
   schedule_production_loop();//循环调用
   return result;
}

block_production_condition::block_production_condition_enum producer_plugin_impl::maybe_produce_block(fc::mutable_variant_object& capture) {
   chain::chain_controller& chain = app().get_plugin<chain_plugin>().chain();
   fc::time_point now = fc::time_point::now();

   if (app().get_plugin<chain_plugin>().is_skipping_transaction_signatures()) {
      \_production_skip_flags |= skip_transaction_signatures;
   }
   // If the next block production opportunity is in the present or future, we're synced.
   if( \!\_production_enabled )
   {
      if( chain.get_slot_time(1) >= now )
         \_production_enabled = true;
      else
         return block_production_condition::not_synced;
   }

   // is anyone scheduled to produce now or one second in the future?
   uint32_t slot = chain.get_slot_at_time( now );
   if( slot == 0 )
   {
      capture("next_time", chain.get_slot_time(1));
      return block_production_condition::not_time_yet;
   }

   //
   // this assert should not fail, because now <= db.head_block_time()
   // should have resulted in slot == 0.
   //
   // if this assert triggers, there is a serious bug in get_slot_at_time()
   // which would result in allowing a later block to have a timestamp
   // less than or equal to the previous block
   //
   assert( now > chain.head_block_time() );

   auto scheduled_producer = chain.get_scheduled_producer( slot );
   // we must control the producer scheduled to produce the next block.
   if( \_producers.find( scheduled_producer ) == \_producers.end() )
   {
      capture("scheduled_producer", scheduled_producer);
      return block_production_condition::not_my_turn;
   }

   auto scheduled_time = chain.get_slot_time( slot );
   eosio::chain::public_key_type scheduled_key = chain.get_producer(scheduled_producer).signing_key;
   auto private_key_itr = \_private_keys.find( scheduled_key );

......

   //此处产生块
   auto block = chain.generate_block(
      scheduled_time,
      scheduled_producer,
      private_key_itr->second,
      \_production_skip_flags
      );

   capture("n", block.block_num())("t", block.timestamp)("c", now)("count",block.input_transactions.size())("id",string(block.id()).substr(8,8));

   app().get_plugin<net_plugin>().broadcast_block(block);//广播块消息
   return block_production_condition::produced;
}

产生块的代码也比较简单,在chain-controller.cpp中:
signed_block chain_controller::generate_block(
   block_timestamp_type when,
   account_name producer,
   const private_key_type& block_signing_private_key,
   uint32_t skip /* = 0 */
   )
{ try {
   return with_skip_flags( skip | created_block, [&](){
      return \_db.with_write_lock( [&](){
        //直接调用同名函数
         return _generate_block( when, producer, block_signing_private_key );
      });
   });
} FC_CAPTURE_AND_RETHROW( (when) ) }

signed_block chain_controller::_generate_block( block_timestamp_type when,
                                              account_name producer,
                                              const private_key_type& block_signing_key )
{ try {

   try {
     //检测并获取相关参数值
      FC_ASSERT( head_block_time() < (fc::time_point)when, "block must be generated at a timestamp after the head block time" );
      uint32_t skip     = \_skip_flags;
      uint32_t slot_num = get_slot_at_time( when );//获取当前生产者的位置
      FC_ASSERT( slot_num > 0 );
      account_name scheduled_producer = get_scheduled_producer( slot_num );//获得当前区块的生产者
      FC_ASSERT( scheduled_producer == producer );

      const auto& producer_obj = get_producer(scheduled_producer);

      //如果符合条件,创建一个未决的块
      if( !\_pending_block ) {
         _start_pending_block();
      }
         //完成Cycle的构建
      _finalize_pending_cycle();

      if( !(skip & skip_producer_signature) )
         FC_ASSERT( producer_obj.signing_key == block_signing_key.get_public_key(),
                    "producer key ${pk}, block key ${bk}", ("pk", producer_obj.signing_key)("bk", block_signing_key.get_public_key()) );

      //设置未决块转成正式块的相关参数
      \_pending_block->timestamp   = when;
      \_pending_block->producer    = producer_obj.owner;
      \_pending_block->previous    = head_block_id();
      \_pending_block->block_mroot = get_dynamic_global_properties().block_merkle_root.get_root();
      \_pending_block->transaction_mroot = \_pending_block_trace->calculate_transaction_merkle_root();
      \_pending_block->action_mroot = \_pending_block_trace->calculate_action_merkle_root();

      if( is_start_of_round( \_pending_block->block_num() ) ) {
         auto latest_producer_schedule = _calculate_producer_schedule();
         if( latest_producer_schedule != _head_producer_schedule() )
            \_pending_block->new_producers = latest_producer_schedule;
      }
      \_pending_block->schedule_version = get_global_properties().active_producers.version;

      if( !(skip & skip_producer_signature) )
         \_pending_block->sign( block_signing_key );

     //结束块并广播消息保存相关数据
      _finalize_block( *\_pending_block_trace, producer_obj );

      \_pending_block_session->push();

      auto result = move( *\_pending_block );

      clear_pending();

      if (!(skip&skip_fork_db)) {
         \_fork_db.push_block(result);//加入到链中
      }
      return result;
   } catch ( ... ) {
      clear_pending();

      elog( "error while producing block" );
      _start_pending_block();
      throw;
   }

} FC_CAPTURE_AND_RETHROW( (producer) ) }

交易基本就完成了,这块比较麻烦,而且有些细节其实资料和代码有此不匹配,留待后面继续解决。