public_sentiment/web/manager/snowflake_manager.py

55 lines
1.8 KiB
Python
Raw Normal View History

2024-09-18 13:41:28 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
class SnowflakeManager(object):
"""
Twitter的雪花算法实现
"""
def __init__(self, start_time=1420041600000):
self.start_time = start_time / 1000 # 以秒为单位
self.last_timestamp = -1
# 41 bits时间戳
self.timestamp_shift = 22
# 10 bits机器编号
self.machine_id_shift = 12
# 12 bits序列号
self.sequence_shift = 0
# 41 bits可以表示的最大值2^41 - 1
self.max_timestamp = -1 ^ (-1 << self.timestamp_shift)
# 10 bits可以表示的最大值2^10 - 1
self.max_machine_id = -1 ^ (-1 << self.machine_id_shift)
# 12 bits可以表示的最大值2^12 - 1
self.max_sequence = -1 ^ (-1 << self.sequence_shift)
# 机器编号和序列号暂时不使用,可以通过参数传入
self.machine_id = 0
self.sequence = 0
def next_id(self):
timestamp = int(time.time())
if timestamp < self.last_timestamp:
raise ValueError('Current timestamp is less than last timestamp.')
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.max_sequence
if self.sequence == 0:
timestamp = self.til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
return ((timestamp - int(self.start_time)) << self.timestamp_shift) | (
self.machine_id << self.machine_id_shift) | self.sequence
def til_next_millis(self, last_timestamp):
timestamp = int(time.time())
while timestamp <= last_timestamp:
timestamp = int(time.time())
return timestamp