lucene4.7源码研究之索引建立过程(3)-1

真正构造IndexWriter

IndexWriter indexWriter = new IndexWriter(directory, config);

构造过程

  1  public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
  2     conf.setIndexWriter(this); // prevent reuse by other instances 防止被其他实例使用,这个引用被设置在IndexWriterConfig中
  3     config = new LiveIndexWriterConfig(conf);//将IndexWriterConfig中的配置重新赋给LiveIndexWriterConfig,完全是this.x=config.getX(),既然new IndexWriterConfig是直接调用父类构造器,也就是LiveIndexWriterConfig,为什么还要重新来一遍呢?
  4     directory = d;//索引目录
  5     analyzer = config.getAnalyzer();//解析器
  6     infoStream = config.getInfoStream();//NoOutput
  7     mergePolicy = config.getMergePolicy();//合并策略
  8     mergePolicy.setIndexWriter(this);//同样在合并策略中设置一份该IndexWriter引用,同第2行一样,MergePolicy中同样维护了SetOnce<IndexWriter>
  9     mergeScheduler = config.getMergeScheduler();//merge调度器
 10     codec = config.getCodec();//header校验
 11 
 12     bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);//在写线程flush的时候,所有的delete和update数据会放在该缓冲区
 13     poolReaders = config.getReaderPooling();//默认false
 14 
 15     writeLock = directory.makeLock(WRITE_LOCK_NAME);//创建写锁,write.lock
 16 
 17     if (!writeLock.obtain(config.getWriteLockTimeout())) // obtain write lock 获取写锁
 18       throw new LockObtainFailedException("Index locked for write: " + writeLock);
 19 
 20     boolean success = false;
 21     try {
 22       OpenMode mode = config.getOpenMode();//默认create and appen
 23       boolean create;
 24       if (mode == OpenMode.CREATE) {
 25         create = true;
 26       } else if (mode == OpenMode.APPEND) {
 27         create = false;
 28       } else {//默认条件
 29         // CREATE_OR_APPEND - create only if an index does not exist 如果索引文件不存在,则create=true
 30         create = !DirectoryReader.indexExists(directory);//判断索引文件是否存在根据index dir下是否含有segment文件
 31       }
 32 
 33       // If index is too old, reading the segments will throw
 34       // IndexFormatTooOldException.
 35       segmentInfos = new SegmentInfos();
 36 
 37       boolean initialIndexExists = true;
 38 
 39       if (create) {//如果索引文件不存在,策略为先读取该目录下segment文件,根据异常
 40         // Try to read first.  This is to allow create
 41         // against an index that‘s currently open for
 42         // searching.  In this case we write the next
 43         // segments_N file with no segments:
 44         try {
 45           segmentInfos.read(directory);//read方法见lucene之段元数据文件,lucene在针对segment读取有三种策略,不做详解,出现任何问题都会抛出IOException异常
 46           segmentInfos.clear();//确认完毕清空segment信息
 47         } catch (IOException e) {
 48           // Likely this means it‘s a fresh directory
 49           initialIndexExists = false;
 50         }
 51 
 52         // Record that we have a change (zero out all
 53         // segments) pending:
 54         changed();//indexwriter中changecount加1,segmentInfos中version加1
 55       } else {//如果已经存在索引文件
 56         segmentInfos.read(directory);//依然读取段信息
 57 
 58         IndexCommit commit = config.getIndexCommit();
 59         if (commit != null) {//初始化commit为null
 60           // Swap out all segments, but, keep metadata in
 61           // SegmentInfos, like version & generation, to
 62           // preserve write-once.  This is important if
 63           // readers are open against the future commit
 64           // points.
 65           if (commit.getDirectory() != directory)
 66             throw new IllegalArgumentException("IndexCommit‘s directory doesn‘t match my directory");
 67           SegmentInfos oldInfos = new SegmentInfos();
 68           oldInfos.read(directory, commit.getSegmentsFileName());
 69           segmentInfos.replace(oldInfos);
 70           changed();
 71           if (infoStream.isEnabled("IW")) {
 72             infoStream.message("IW", "init: loaded commit \"" + commit.getSegmentsFileName() + "\"");
 73           }
 74         }
 75       }
 76 
 77       rollbackSegments = segmentInfos.createBackupSegmentInfos();//for roll back
 78 
 79       // start with previous field numbers, but new FieldInfos
 80       globalFieldNumberMap = getFieldNumberMap();//加载field信息,fieldname,fieldnumber,docValue,解析如下
 81       config.getFlushPolicy().init(config);//初始化flush策略,设置config引用,获取NoOutput,init方法为同步方法
 82       docWriter = new DocumentsWriter(this, config, directory);//创建DocumentWriter对象,详细如下,主要初始化一个config中的配置
 83       eventQueue = docWriter.eventQueue();//获取事件队列,在上步中进行初始化,concurrentLinkedQueue
 84 
 85       // Default deleter (for backwards compatibility) is
 86       // KeepOnlyLastCommitDeleter:

########################################## 下一篇解析  ###########################################################
 87       synchronized(this) {//构建索引管理对象
 88         deleter = new IndexFileDeleter(directory,
 89                                        config.getIndexDeletionPolicy(),//默认策略为KeepOnlyLastCommitDeleter
 90                                        segmentInfos, infoStream, this,
 91                                        initialIndexExists);
 92       }
 93 
 94       if (deleter.startingCommitDeleted) {
 95         // Deletion policy deleted the "head" commit point.
 96         // We have to mark ourself as changed so that if we
 97         // are closed w/o any further changes we write a new
 98         // segments_N file.
 99         changed();
100       }
101 
102       if (infoStream.isEnabled("IW")) {
103         infoStream.message("IW", "init: create=" + create);
104         messageState();
105       }
106 
107       success = true;
108 
109     } finally {
110       if (!success) {
111         if (infoStream.isEnabled("IW")) {
112           infoStream.message("IW", "init: hit exception on init; releasing write lock");
113         }
114         IOUtils.closeWhileHandlingException(writeLock);
115         writeLock = null;
116       }
117     }
118   }

 

 

第2行:在IndexWriteConfig中维护一个SetOnce,并非Set集合,dexWriter时会把该引用存入SetOnce中,lucene给出的注释是防止其他实例调用

  // indicates whether this config instance is already attached to a writer.
  // not final so that it can be cloned properly.
  private SetOnce<IndexWriter> writer = new SetOnce<IndexWriter>();
  
  /**
   * Sets the {@link IndexWriter} this config is attached to.
   * 
   * @throws AlreadySetException
   *           if this config is already attached to a writer.
   */
  IndexWriterConfig setIndexWriter(IndexWriter writer) {
    this.writer.set(writer);//SetOnce保证了多线程的情况,IndexWriter对象引用会被set一次
    return this;
  }

以下是SetOnce的结构

/**
 * A convenient class which offers a semi-immutable object wrapper
 * implementation which allows one to set the value of an object exactly once,
 * and retrieve it many times. If {@link #set(Object)} is called more than once,
 * {@link AlreadySetException} is thrown and the operation
 * will fail.
 *
 * @lucene.experimental
 */
public final class SetOnce<T> implements Cloneable {

  /** Thrown when {@link SetOnce#set(Object)} is called more than once. */
  public static final class AlreadySetException extends IllegalStateException {
    public AlreadySetException() {
      super("The object cannot be set twice!");
    }
  }
  
  private volatile T obj = null;//volatile类型
  private final AtomicBoolean set;//原子boolean类型
  
  /**
   * A default constructor which does not set the internal object, and allows
   * setting it by calling {@link #set(Object)}.
   */
  public SetOnce() {
    set = new AtomicBoolean(false);//初始化false
  }

  /**
   * Creates a new instance with the internal object set to the given object.
   * Note that any calls to {@link #set(Object)} afterwards will result in
   * {@link AlreadySetException}
   *
   * @throws AlreadySetException if called more than once
   * @see #set(Object)
   */
  public SetOnce(T obj) {
    this.obj = obj;
    set = new AtomicBoolean(true);
  }
  
  /** Sets the given object. If the object has already been set, an exception is thrown. */
  public final void set(T obj) {//调用set
    if (set.compareAndSet(false, true)) {//更新原子boolean值,保证只有一个线程能够set成功
      this.obj = obj;//volatile类型保证对所有线程立即可见
    } else {//当其他线程检测到obj已经被设置,则抛出异常
      throw new AlreadySetException();
    }
  }
  
  /** Returns the object set by {@link #set(Object)}. */
  public final T get() {
    return obj;
  }
  
  @Override
  public SetOnce<T> clone() {
    return obj == null ? new SetOnce<T>() : new SetOnce<T>(obj);
  }

第17行,获取写锁,Lock

 /** Attempts to obtain an exclusive lock within amount of
   *  time given. Polls once per {@link #LOCK_POLL_INTERVAL}
   *  (currently 1000) milliseconds until lockWaitTimeout is
   *  passed.
   * @param lockWaitTimeout length of time to wait in
   *        milliseconds or {@link
   *        #LOCK_OBTAIN_WAIT_FOREVER} to retry forever
   * @return true if lock was obtained
   * @throws LockObtainFailedException if lock wait times out
   * @throws IllegalArgumentException if lockWaitTimeout is
   *         out of bounds
   * @throws IOException if obtain() throws IOException
   */
  public boolean obtain(long lockWaitTimeout) throws IOException {
    failureReason = null;
    boolean locked = obtain();//尝试获取锁,见下方代码
    if (lockWaitTimeout < 0 && lockWaitTimeout != LOCK_OBTAIN_WAIT_FOREVER)//
      throw new IllegalArgumentException("lockWaitTimeout should be LOCK_OBTAIN_WAIT_FOREVER or a non-negative number (got " + lockWaitTimeout + ")");

    long maxSleepCount = lockWaitTimeout / LOCK_POLL_INTERVAL;
    long sleepCount = 0;
    while (!locked) {//循环获取锁
      if (lockWaitTimeout != LOCK_OBTAIN_WAIT_FOREVER && sleepCount++ >= maxSleepCount) {//默认情况下是无限循环下去,
        String reason = "Lock obtain timed out: " + this.toString();
        if (failureReason != null) {
          reason += ": " + failureReason;
        }
        LockObtainFailedException e = new LockObtainFailedException(reason);
        if (failureReason != null) {
          e.initCause(failureReason);
        }
        throw e;
      }
      try {
        Thread.sleep(LOCK_POLL_INTERVAL);//睡眠间隔
      } catch (InterruptedException ie) {
        throw new ThreadInterruptedException(ie);
      }
      locked = obtain();
    }
    return locked;
  }

SimpleFSDirectory获取锁实现

@Override
  public boolean obtain() throws IOException {

    // Ensure that lockDir exists and is a directory:
    if (!lockDir.exists()) {//lockDir在Directory directory = FSDirectory.open的时候就已经设置好了,确定索引目录
      if (!lockDir.mkdirs())//如果没有就创建,创建失败就抛异常
        throw new IOException("Cannot create directory: " +
                              lockDir.getAbsolutePath());
    } else if (!lockDir.isDirectory()) {//非目录抛异常
      // TODO: NoSuchDirectoryException instead?
      throw new IOException("Found regular file where directory expected: " + 
                            lockDir.getAbsolutePath());
    }
    return lockFile.createNewFile();//创建write.lock,返回创建结果,利用文件系统限制并发操作
  }

 

第80行,

  /**
   * Loads or returns the already loaded the global field number map for this {@link SegmentInfos}.
   * If this {@link SegmentInfos} has no global field number map the returned instance is empty
   */
  private FieldNumbers getFieldNumberMap() throws IOException {
    final FieldNumbers map = new FieldNumbers();//

    for(SegmentCommitInfo info : segmentInfos) {
      for(FieldInfo fi : SegmentReader.readFieldInfos(info)) {
        map.addOrGet(fi.name, fi.number, fi.getDocValuesType());
      }
    }

    return map;
  }

FieldNumbers作为FieldInfos的一个静态内部类,维护以下3个hashmap,主要涉及到了field的三个属性

1.fieldname,顾名思义,2.fileNumber,fileNumber使用一个byte的8位记录了该filed的属性,是否索引,是否存储,是否含有docValue等等,详情已在域元文件解析文章中说明,3.docValueType

    private final Map<Integer,String> numberToName;
    private final Map<String,Integer> nameToNumber;
    // We use this to enforce that a given field never
    // changes DV type, even across segments / IndexWriter
    // sessions:
    private final Map<String,DocValuesType> docValuesType;//key:fieldname,value:docValueType

提供同步addOrGet接口,支持多线程

/**
     * Returns the global field number for the given field name. If the name
     * does not exist yet it tries to add it with the given preferred field
     * number assigned if possible otherwise the first unassigned field number
     * is used as the field number.
     */
    synchronized int addOrGet(String fieldName, int preferredFieldNumber, DocValuesType dvType) {
      if (dvType != null) {
        DocValuesType currentDVType = docValuesType.get(fieldName);//docValueType 使用byte记录类型,前4位为标准类型,后4位为docValues类型,lucene支持4种docValueType,分别为BINARY,NUMBERIC,STORED,STORED_SET
        if (currentDVType == null) {
          docValuesType.put(fieldName, dvType);
        } else if (currentDVType != null && currentDVType != dvType) {
          throw new IllegalArgumentException("cannot change DocValues type from " + currentDVType + " to " + dvType + " for field \"" + fieldName + "\"");
        }
      }
      Integer fieldNumber = nameToNumber.get(fieldName);//判断是否已经含有该field
      if (fieldNumber == null) {
        final Integer preferredBoxed = Integer.valueOf(preferredFieldNumber);

        if (preferredFieldNumber != -1 && !numberToName.containsKey(preferredBoxed)) {//如果fieldnum不为-1,并且还没有存储该域值,则赋值给fieldnum
          // cool - we can use this number globally
          fieldNumber = preferredBoxed;
        } else {//否则,寻找一个新的fieldnumber,这个寻找的规则就是从-1开始逐步加1,直到numberToName中没有存储该域值,记录此时数值做为域值,因为numberToName的key是域值,而域值的范围根据8为bit的类型是有限的,相同类型的field数值是冲突的,所以有冲突的域值就做加1操作,作为累加之后的fieldnumber怎么在使用的时候去解析呢?
          // find a new FieldNumber
          while (numberToName.containsKey(++lowestUnassignedFieldNumber)) {//默认lowestUnassignedFieldNumber=-1,
            // might not be up to date - lets do the work once needed
          }
          fieldNumber = lowestUnassignedFieldNumber;
        }
        
        numberToName.put(fieldNumber, fieldName);
        nameToNumber.put(fieldName, fieldNumber);
      }

      return fieldNumber.intValue();//返回该值
    }

 

第82行,DocumentWriter初始化

DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directory) {
    this.directory = directory;
    this.config = config;
    this.infoStream = config.getInfoStream();
    this.perThreadPool = config.getIndexerThreadPool();
    flushPolicy = config.getFlushPolicy();
    this.writer = writer;
    this.events = new ConcurrentLinkedQueue<Event>();//初始化了一个事件队列,并且是线程安全的
    flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream);
  }

 

接上,初始化一个文档flush控制器DocumentWriterFlushControl

  DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedUpdatesStream bufferedUpdatesStream) {
    this.infoStream = config.getInfoStream();
    this.stallControl = new DocumentsWriterStallControl();
    this.perThreadPool = documentsWriter.perThreadPool;
    this.flushPolicy = documentsWriter.flushPolicy;
    this.config = config;
    this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
    this.documentsWriter = documentsWriter;
    this.bufferedUpdatesStream = bufferedUpdatesStream;//出现了,在IndexWriter构造的时候初始化了该属性,用于线程delete和update时候存放缓冲数据
  }

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。