#!/usr/bin/env python # -*- coding: utf-8 -*- import time class SnowflakeManager(object): """ Twitter的雪花算法实现 """ def __init__(self, start_time=1420041600000): self.start_time = start_time / 1000 # 以秒为单位 self.last_timestamp = -1 # 41 bits时间戳 self.timestamp_shift = 22 # 10 bits机器编号 self.machine_id_shift = 12 # 12 bits序列号 self.sequence_shift = 0 # 41 bits可以表示的最大值,2^41 - 1 self.max_timestamp = -1 ^ (-1 << self.timestamp_shift) # 10 bits可以表示的最大值,2^10 - 1 self.max_machine_id = -1 ^ (-1 << self.machine_id_shift) # 12 bits可以表示的最大值,2^12 - 1 self.max_sequence = -1 ^ (-1 << self.sequence_shift) # 机器编号和序列号暂时不使用,可以通过参数传入 self.machine_id = 0 self.sequence = 0 def next_id(self): timestamp = int(time.time()) if timestamp < self.last_timestamp: raise ValueError('Current timestamp is less than last timestamp.') if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & self.max_sequence if self.sequence == 0: timestamp = self.til_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp return ((timestamp - int(self.start_time)) << self.timestamp_shift) | ( self.machine_id << self.machine_id_shift) | self.sequence def til_next_millis(self, last_timestamp): timestamp = int(time.time()) while timestamp <= last_timestamp: timestamp = int(time.time()) return timestamp